Author: ssc
Date: Sun Nov 13 17:00:36 2011
New Revision: 1201457
URL: http://svn.apache.org/viewvc?rev=1201457&view=rev
Log:
MAHOUT-879 Remove all graph algorithms with the exception of PageRank
Added:
mahout/trunk/core/src/main/java/org/apache/mahout/graph/AdjacencyMatrixJob.java
mahout/trunk/src/conf/pagerank.props
mahout/trunk/src/conf/randomwalkwithrestart.props
Removed:
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/
mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/
mahout/trunk/core/src/main/java/org/apache/mahout/graph/preprocessing/
mahout/trunk/core/src/test/java/org/apache/mahout/graph/GraphTestCase.java
mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/
mahout/trunk/core/src/test/java/org/apache/mahout/graph/preprocessing/
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALSUtils.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/IntPairWritable.java
mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/PageRankJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/RandomWalk.java
mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/RandomWalkWithRestartJob.java
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJobTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/graph/linkanalysis/PageRankJobTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/graph/linkanalysis/RandomWalkWithRestartJobTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java
mahout/trunk/src/conf/driver.classes.props
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java?rev=1201457&r1=1201456&r2=1201457&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
Sun Nov 13 17:00:36 2011
@@ -77,24 +77,6 @@ public final class TasteHadoopUtils {
return indexItemIDMap;
}
- public static void writeInt(int value, Path path, Configuration conf) throws
IOException {
- FileSystem fs = FileSystem.get(path.toUri(), conf);
- FSDataOutputStream out = fs.create(path);
- try {
- out.writeInt(value);
- } finally {
- Closeables.closeQuietly(out);
- }
- }
- public static int readInt(Path path, Configuration conf) throws IOException {
- FileSystem fs = FileSystem.get(path.toUri(), conf);
- FSDataInputStream in = fs.open(path);
- try {
- return in.readInt();
- } finally {
- Closeables.closeQuietly(in);
- }
- }
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALSUtils.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALSUtils.java?rev=1201457&r1=1201456&r2=1201457&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALSUtils.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALSUtils.java
Sun Nov 13 17:00:36 2011
@@ -25,6 +25,7 @@ import org.apache.mahout.common.iterator
import org.apache.mahout.common.iterator.sequencefile.PathType;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Matrix;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.map.OpenIntObjectHashMap;
@@ -60,29 +61,4 @@ public class ALSUtils {
}
return matrix;
}
-
- public static String nice(Vector v) {
- if (!v.isSequentialAccess()) {
- v = new DenseVector(v);
- }
-
- DecimalFormat df = new DecimalFormat("0.00",
DecimalFormatSymbols.getInstance(Locale.ENGLISH));
-
- StringBuilder buffer = new StringBuilder("[");
- String separator = "";
- for (Vector.Element e : v) {
- buffer.append(separator);
- if (!Double.isNaN(e.get())) {
- if (e.get() >= 0) {
- buffer.append(" ");
- }
- buffer.append(df.format(e.get()));
- } else {
- buffer.append(" - ");
- }
- separator = "\t";
- }
- buffer.append(" ]");
- return buffer.toString();
- }
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=1201457&r1=1201456&r2=1201457&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
Sun Nov 13 17:00:36 2011
@@ -159,7 +159,7 @@ public final class RecommenderJob extend
"--booleanData", String.valueOf(booleanData),
"--tempDir", getTempPath().toString()});
- numberOfUsers = TasteHadoopUtils.readInt(new Path(prepPath,
PreparePreferenceMatrixJob.NUM_USERS), getConf());
+ numberOfUsers = HadoopUtil.readInt(new Path(prepPath,
PreparePreferenceMatrixJob.NUM_USERS), getConf());
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java?rev=1201457&r1=1201456&r2=1201457&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java
Sun Nov 13 17:00:36 2011
@@ -31,6 +31,7 @@ import org.apache.mahout.cf.taste.hadoop
import org.apache.mahout.cf.taste.hadoop.item.RecommenderJob;
import org.apache.mahout.cf.taste.hadoop.item.ToUserVectorsReducer;
import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.VectorWritable;
@@ -86,7 +87,7 @@ public class PreparePreferenceMatrixJob
toUserVectors.waitForCompletion(true);
//we need the number of users later
int numberOfUsers = (int)
toUserVectors.getCounters().findCounter(ToUserVectorsReducer.Counters.USERS).getValue();
- TasteHadoopUtils.writeInt(numberOfUsers, getOutputPath(NUM_USERS),
getConf());
+ HadoopUtil.writeInt(numberOfUsers, getOutputPath(NUM_USERS), getConf());
//build the rating matrix
Job toItemVectors = prepareJob(getOutputPath(USER_VECTORS),
getOutputPath(RATING_MATRIX),
ToItemVectorsMapper.class, IntWritable.class,
VectorWritable.class, ToItemVectorsReducer.class,
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java?rev=1201457&r1=1201456&r2=1201457&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
Sun Nov 13 17:00:36 2011
@@ -39,6 +39,7 @@ import org.apache.mahout.cf.taste.hadoop
import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
import
org.apache.mahout.cf.taste.hadoop.preparation.PreparePreferenceMatrixJob;
import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.hadoop.similarity.cooccurrence.RowSimilarityJob;
@@ -137,7 +138,7 @@ public final class ItemSimilarityJob ext
}
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- int numberOfUsers = TasteHadoopUtils.readInt(new Path(prepPath,
PreparePreferenceMatrixJob.NUM_USERS),
+ int numberOfUsers = HadoopUtil.readInt(new Path(prepPath,
PreparePreferenceMatrixJob.NUM_USERS),
getConf());
ToolRunner.run(getConf(), new RowSimilarityJob(), new String[] {
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java?rev=1201457&r1=1201456&r2=1201457&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
(original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
Sun Nov 13 17:00:36 2011
@@ -24,12 +24,10 @@ import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
+import com.google.common.io.Closeables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
@@ -254,4 +252,24 @@ public final class HadoopUtil {
conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization,"
+ "org.apache.hadoop.io.serializer.WritableSerialization");
}
+
+ public static void writeInt(int value, Path path, Configuration conf) throws
IOException {
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ FSDataOutputStream out = fs.create(path);
+ try {
+ out.writeInt(value);
+ } finally {
+ Closeables.closeQuietly(out);
+ }
+ }
+
+ public static int readInt(Path path, Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ FSDataInputStream in = fs.open(path);
+ try {
+ return in.readInt();
+ } finally {
+ Closeables.closeQuietly(in);
+ }
+ }
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/common/IntPairWritable.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/IntPairWritable.java?rev=1201457&r1=1201456&r2=1201457&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/common/IntPairWritable.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/common/IntPairWritable.java
Sun Nov 13 17:00:36 2011
@@ -30,7 +30,8 @@ import java.util.Arrays;
/**
* A {@link WritableComparable} which encapsulates an ordered pair of signed
integers.
*/
-public final class IntPairWritable
+public final class
+ IntPairWritable
extends BinaryComparable
implements WritableComparable<BinaryComparable>, Serializable, Cloneable {
Added:
mahout/trunk/core/src/main/java/org/apache/mahout/graph/AdjacencyMatrixJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/AdjacencyMatrixJob.java?rev=1201457&view=auto
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/graph/AdjacencyMatrixJob.java
(added)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/graph/AdjacencyMatrixJob.java
Sun Nov 13 17:00:36 2011
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph;
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.common.mapreduce.VectorSumReducer;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.map.OpenIntIntHashMap;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * <p>Distributed computation of the adjacency matrix of a directed graph, see
http://en.wikipedia.org/wiki/Adjacency_matrix
+ *
+ * <p>This job outputs {@link org.apache.hadoop.io.SequenceFile}s an {@link
IntWritable} as key and a {@link VectorWritable} as value</p>
+ *
+ * <p>Command line arguments specific to this class are:</p>
+ *
+ * <ol>
+ * <li>--output=(path): output path where the resulting matrix should be
written</li>
+ * <li>--vertices=(path): file containing a list of all vertices</li>
+ * <li>--edges=(path): Directory containing edges of the graph</li>
+ * </ol>
+ *
+ * <p>General command line options are documented in {@link AbstractJob}.</p>
+ *
+ * <p>Note that because of how Hadoop parses arguments, all "-D" arguments
must appear before all other arguments.</p>
+ */
+public class AdjacencyMatrixJob extends AbstractJob {
+
+ public static final String NUM_VERTICES = "numVertices.bin";
+ public static final String ADJACENCY_MATRIX = "adjacencyMatrix";
+ public static final String VERTEX_INDEX = "vertexIndex";
+
+ static final String NUM_VERTICES_PARAM = AdjacencyMatrixJob.class.getName()
+ ".numVertices";
+ static final String VERTEX_INDEX_PARAM = AdjacencyMatrixJob.class.getName()
+ ".vertexIndex";
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ addOption("vertices", null, "a text file containing all vertices of the
graph (one per line)", true);
+ addOption("edges", null, "text files containing the edges of the graph
(vertexA,vertextB per line)", true);
+ addOutputOption();
+
+ Map<String, String> parsedArgs = parseArguments(args);
+
+ Path vertices = new Path(parsedArgs.get("--vertices"));
+ Path edges = new Path(parsedArgs.get("--edges"));
+
+ int numVertices = indexVertices(vertices, getOutputPath(VERTEX_INDEX));
+
+ HadoopUtil.writeInt(numVertices, getOutputPath(NUM_VERTICES), getConf());
+
+ Preconditions.checkArgument(numVertices > 0);
+
+ Job createAdjacencyMatrix = prepareJob(edges,
getOutputPath(ADJACENCY_MATRIX), TextInputFormat.class,
+ VectorizeEdgesMapper.class, IntWritable.class, VectorWritable.class,
VectorSumReducer.class,
+ IntWritable.class, VectorWritable.class,
SequenceFileOutputFormat.class);
+ createAdjacencyMatrix.setCombinerClass(VectorSumReducer.class);
+ Configuration createAdjacencyMatrixConf =
createAdjacencyMatrix.getConfiguration();
+ createAdjacencyMatrixConf.set(NUM_VERTICES_PARAM,
String.valueOf(numVertices));
+ createAdjacencyMatrixConf.set(VERTEX_INDEX_PARAM,
getOutputPath(VERTEX_INDEX).toString());
+ createAdjacencyMatrix.waitForCompletion(true);
+
+ return 0;
+ }
+
+ //TODO do this in parallel?
+ private int indexVertices(Path verticesPath, Path indexPath) throws
IOException {
+ FileSystem fs = FileSystem.get(verticesPath.toUri(), getConf());
+ SequenceFile.Writer writer = null;
+ int index = 0;
+
+ try {
+ writer = SequenceFile.createWriter(fs, getConf(), indexPath,
IntWritable.class, IntWritable.class);
+
+ for (FileStatus fileStatus : fs.listStatus(verticesPath)) {
+ InputStream in = null;
+ try {
+ in = HadoopUtil.openStream(fileStatus.getPath(), getConf());
+ for (String line : new FileLineIterable(in)) {
+ writer.append(new IntWritable(index++), new
IntWritable(Integer.parseInt(line)));
+ }
+ } finally {
+ Closeables.closeQuietly(in);
+ }
+ }
+ } finally {
+ Closeables.closeQuietly(writer);
+ }
+
+ return index;
+ }
+
+ static class VectorizeEdgesMapper extends
Mapper<LongWritable,Text,IntWritable,VectorWritable> {
+
+ private int numVertices;
+ private OpenIntIntHashMap vertexIDsToIndex;
+
+ private final IntWritable row = new IntWritable();
+
+ private static final Pattern SEPARATOR = Pattern.compile("[\t,]");
+
+ @Override
+ protected void setup(Context ctx) throws IOException, InterruptedException
{
+ Configuration conf = ctx.getConfiguration();
+ numVertices = Integer.parseInt(conf.get(NUM_VERTICES_PARAM));
+ Path vertexIndexPath = new Path(conf.get(VERTEX_INDEX_PARAM));
+ vertexIDsToIndex = new OpenIntIntHashMap(numVertices);
+ for (Pair<IntWritable,IntWritable> indexAndVertexID :
+ new SequenceFileIterable<IntWritable,IntWritable>(vertexIndexPath,
true, conf)) {
+ vertexIDsToIndex.put(indexAndVertexID.getSecond().get(),
indexAndVertexID.getFirst().get());
+ }
+ }
+
+ @Override
+ protected void map(LongWritable offset, Text line, Mapper.Context ctx)
+ throws IOException, InterruptedException {
+
+ String[] tokens = SEPARATOR.split(line.toString());
+ int rowIndex = vertexIDsToIndex.get(Integer.parseInt(tokens[0]));
+ int columnIndex = vertexIDsToIndex.get(Integer.parseInt(tokens[1]));
+ RandomAccessSparseVector partialTransitionMatrixRow = new
RandomAccessSparseVector(numVertices, 1);
+
+ row.set(rowIndex);
+ partialTransitionMatrixRow.setQuick(columnIndex, 1);
+
+ ctx.write(row, new VectorWritable(partialTransitionMatrixRow));
+ }
+ }
+
+}
\ No newline at end of file
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/PageRankJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/PageRankJob.java?rev=1201457&r1=1201456&r2=1201457&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/PageRankJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/PageRankJob.java
Sun Nov 13 17:00:36 2011
@@ -17,36 +17,24 @@
package org.apache.mahout.graph.linkanalysis;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.AbstractJob;
-import org.apache.mahout.graph.model.Edge;
-import org.apache.mahout.graph.model.Vertex;
import org.apache.mahout.math.DenseVector;
import org.apache.mahout.math.Vector;
/**
* <p>Distributed computation of the PageRank a directed graph</p>
*
- * <p>The input files need to be a {@link org.apache.hadoop.io.SequenceFile}
with {@link Edge}s as keys and
- * any Writable as values and another {@link
org.apache.hadoop.io.SequenceFile} with {@link IntWritable}s as keys and {@link
Vertex} as
- * values, as produced by {@link
org.apache.mahout.graph.preprocessing.GraphUtils#indexVertices(Configuration,
Path, Path)}</p>
- *
* <p>This job outputs text files with a vertex id and its pagerank per
line.</p>
*
* <p>Command line arguments specific to this class are:</p>
*
* <ol>
* <li>--output=(path): output path</li>
- * <li>--vertexIndex=(path): Directory containing vertex index as created by
GraphUtils.indexVertices()</li>
- * <li>--edges=(path): Directory containing edges of the graph</li>
- * <li>--numVertices=(Integer): number of vertices in the graph</li>
- * <li>--numIterations=(Integer): number of numIterations, default: 5</li>
- * <li>--stayingProbability=(Double): probability not to teleport to a random
vertex, default: 0.8</li>
+ * <li>--vertices=(path): file containing the list of vertices of the graph
(one per line)</li>
+ * <li>--edges=(path): directory containing edges of the graph (pair of vertex
ids per line in textformat)</li>
+ * <li>--numIterations=(Integer): number of numIterations, default: 10</li>
+ * <li>--stayingProbability=(Double): probability not to teleport to a random
vertex, default: 0.85</li>
* </ol>
*
* <p>General command line options are documented in {@link AbstractJob}.</p>
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/RandomWalk.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/RandomWalk.java?rev=1201457&r1=1201456&r2=1201457&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/RandomWalk.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/RandomWalk.java
Sun Nov 13 17:00:36 2011
@@ -19,6 +19,7 @@ package org.apache.mahout.graph.linkanal
import com.google.common.base.Preconditions;
import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
@@ -30,23 +31,30 @@ import org.apache.hadoop.mapreduce.lib.i
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.AbstractJob;
-import org.apache.mahout.graph.model.Vertex;
-import org.apache.mahout.graph.preprocessing.AdjacencyMatrixJob;
-import org.apache.mahout.graph.preprocessing.GraphUtils;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.mapreduce.MergeVectorsCombiner;
+import org.apache.mahout.common.mapreduce.MergeVectorsReducer;
+import org.apache.mahout.graph.AdjacencyMatrixJob;
import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
import org.apache.mahout.math.hadoop.DistributedRowMatrix;
import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.Iterator;
import java.util.Map;
-public abstract class RandomWalk extends AbstractJob {
+abstract class RandomWalk extends AbstractJob {
- static final String ADJACENCY_MATRIX = "adjacencyMatrix";
static final String RANK_VECTOR = "rankVector";
+ static final String NUM_VERTICES_PARAM = AdjacencyMatrixJob.class.getName()
+ ".numVertices";
+ static final String STAYING_PROBABILITY_PARAM =
AdjacencyMatrixJob.class.getName() + ".stayingProbability";
+
protected abstract Vector createDampingVector(int numVertices, double
stayingProbability);
protected void addSpecificOptions() {}
@@ -55,11 +63,10 @@ public abstract class RandomWalk extends
@Override
public final int run(String[] args) throws Exception {
addOutputOption();
- addOption("vertexIndex", "vi", "vertex index as created by
GraphUtils.indexVertices()", true);
- addOption("edges", "e", "edges of the graph", true);
- addOption("numVertices", "nv", "number of vertices in the graph", true);
- addOption("numIterations", "it", "number of numIterations",
String.valueOf(5));
- addOption("stayingProbability", "tp", "probability not to teleport to a
random vertex", String.valueOf(0.8));
+ addOption("vertices", null, "a text file containing all vertices of the
graph (one per line)", true);
+ addOption("edges", null, "edges of the graph", true);
+ addOption("numIterations", "it", "number of numIterations",
String.valueOf(10));
+ addOption("stayingProbability", "tp", "probability not to teleport to a
random vertex", String.valueOf(0.85));
addSpecificOptions();
@@ -70,38 +77,47 @@ public abstract class RandomWalk extends
evaluateSpecificOptions(parsedArgs);
- Path vertexIndex = new Path(parsedArgs.get("--vertexIndex"));
- Path edges = new Path(parsedArgs.get("--edges"));
-
- int numVertices = Integer.parseInt(parsedArgs.get("--numVertices"));
int numIterations = Integer.parseInt(parsedArgs.get("--numIterations"));
double stayingProbability =
Double.parseDouble(parsedArgs.get("--stayingProbability"));
- Preconditions.checkArgument(numVertices > 0);
Preconditions.checkArgument(numIterations > 0);
Preconditions.checkArgument(stayingProbability > 0.0 && stayingProbability
<= 1.0);
- /* create the substochastified adjacency matrix */
- ToolRunner.run(getConf(), new AdjacencyMatrixJob(), new String[] {
"--vertexIndex", vertexIndex.toString(),
- "--edges", edges.toString(), "--output",
getTempPath(ADJACENCY_MATRIX).toString(),
- "--numVertices", String.valueOf(numVertices), "--stayingProbability",
String.valueOf(stayingProbability),
- "--substochastify", String.valueOf(true), "--tempDir",
getTempPath().toString() });
+ Path adjacencyMatrixPath =
getTempPath(AdjacencyMatrixJob.ADJACENCY_MATRIX);
+ Path transitionMatrixPath = getTempPath("transitionMatrix");
+ Path vertexIndexPath = getTempPath(AdjacencyMatrixJob.VERTEX_INDEX);
+ Path numVerticesPath = getTempPath(AdjacencyMatrixJob.NUM_VERTICES);
+
+ /* create the adjacency matrix */
+ ToolRunner.run(getConf(), new AdjacencyMatrixJob(), new String[] {
"--vertices", parsedArgs.get("--vertices"),
+ "--edges", parsedArgs.get("--edges"), "--output",
getTempPath().toString() });
+
+ int numVertices = HadoopUtil.readInt(numVerticesPath, getConf());
+ Preconditions.checkArgument(numVertices > 0);
+
+ /* transpose and stochastify the adjacency matrix to create the transition
matrix */
+ Job createTransitionMatrix = prepareJob(adjacencyMatrixPath,
transitionMatrixPath, TransposeMapper.class,
+ IntWritable.class, VectorWritable.class, MergeVectorsReducer.class,
IntWritable.class, VectorWritable.class);
+ createTransitionMatrix.setCombinerClass(MergeVectorsCombiner.class);
+ createTransitionMatrix.getConfiguration().set(NUM_VERTICES_PARAM,
String.valueOf(numVertices));
+ createTransitionMatrix.getConfiguration().set(STAYING_PROBABILITY_PARAM,
String.valueOf(stayingProbability));
+ createTransitionMatrix.waitForCompletion(true);
- DistributedRowMatrix adjacencyMatrix = new
DistributedRowMatrix(getTempPath(ADJACENCY_MATRIX), getTempPath(),
+ DistributedRowMatrix transitionMatrix = new
DistributedRowMatrix(transitionMatrixPath, getTempPath(),
numVertices, numVertices);
- adjacencyMatrix.setConf(getConf());
+ transitionMatrix.setConf(getConf());
Vector ranking = new DenseVector(numVertices).assign(1.0 / numVertices);
Vector dampingVector = createDampingVector(numVertices,
stayingProbability);
- /* power method: iterative adjacency-matrix times ranking-vector
multiplication */
+ /* power method: iterative transition-matrix times ranking-vector
multiplication */
while (numIterations-- > 0) {
- ranking = adjacencyMatrix.times(ranking).plus(dampingVector);
+ ranking = transitionMatrix.times(ranking).plus(dampingVector);
}
- GraphUtils.persistVector(getConf(), getTempPath(RANK_VECTOR), ranking);
+ persistVector(getConf(), getTempPath(RANK_VECTOR), ranking);
- Job vertexWithPageRank = prepareJob(vertexIndex, getOutputPath(),
SequenceFileInputFormat.class,
+ Job vertexWithPageRank = prepareJob(vertexIndexPath, getOutputPath(),
SequenceFileInputFormat.class,
RankPerVertexMapper.class, LongWritable.class, DoubleWritable.class,
TextOutputFormat.class);
vertexWithPageRank.getConfiguration().set(RankPerVertexMapper.RANK_PATH_PARAM,
getTempPath(RANK_VECTOR).toString());
@@ -110,7 +126,51 @@ public abstract class RandomWalk extends
return 1;
}
- public static class RankPerVertexMapper extends
Mapper<IntWritable,Vertex,LongWritable,DoubleWritable> {
+ static void persistVector(Configuration conf, Path path, Vector vector)
throws IOException {
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ DataOutputStream out = null;
+ try {
+ out = fs.create(path, true);
+ VectorWritable.writeVector(out, vector);
+ } finally {
+ Closeables.closeQuietly(out);
+ }
+ }
+
+ static class TransposeMapper extends
Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+ private int numVertices;
+ private double stayingProbability;
+
+ @Override
+ protected void setup(Mapper.Context ctx) throws IOException,
InterruptedException {
+ stayingProbability =
Double.parseDouble(ctx.getConfiguration().get(STAYING_PROBABILITY_PARAM));
+ numVertices =
Integer.parseInt(ctx.getConfiguration().get(NUM_VERTICES_PARAM));
+ }
+
+ @Override
+ protected void map(IntWritable r, VectorWritable v, Context ctx) throws
IOException, InterruptedException {
+ int rowIndex = r.get();
+
+ Vector row = v.get();
+ row = row.normalize(1);
+ if (stayingProbability != 1.0) {
+ row.assign(Functions.MULT, stayingProbability);
+ }
+
+ Iterator<Vector.Element> it = row.iterateNonZero();
+ while (it.hasNext()) {
+ Vector.Element e = it.next();
+ RandomAccessSparseVector tmp = new
RandomAccessSparseVector(numVertices, 1);
+ tmp.setQuick(rowIndex, e.get());
+ r.set(e.index());
+ ctx.write(r, new VectorWritable(tmp));
+ }
+ }
+ }
+
+
+ public static class RankPerVertexMapper extends
Mapper<IntWritable,IntWritable,IntWritable,DoubleWritable> {
static final String RANK_PATH_PARAM = RankPerVertexMapper.class.getName()
+ ".pageRankPath";
@@ -128,8 +188,9 @@ public abstract class RandomWalk extends
}
@Override
- protected void map(IntWritable index, Vertex vertex, Mapper.Context ctx)
throws IOException, InterruptedException {
- ctx.write(new LongWritable(vertex.id()), new
DoubleWritable(ranks.get(index.get())));
+ protected void map(IntWritable index, IntWritable vertex, Mapper.Context
ctx)
+ throws IOException, InterruptedException {
+ ctx.write(vertex, new DoubleWritable(ranks.get(index.get())));
}
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/RandomWalkWithRestartJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/RandomWalkWithRestartJob.java?rev=1201457&r1=1201456&r2=1201457&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/RandomWalkWithRestartJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/RandomWalkWithRestartJob.java
Sun Nov 13 17:00:36 2011
@@ -17,33 +17,26 @@
package org.apache.mahout.graph.linkanalysis;
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.AbstractJob;
-import org.apache.mahout.graph.model.Vertex;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.Vector;
import java.util.Map;
/**
- * <p>Distributed computation of multiple random walks in a directed graph,
one for each source vertex given</p>
+ * <p>Distributed computation of the proximities of vertices to a source
vertex in a directed graph</p>
*
- * <p>The input files need to be a {@link org.apache.hadoop.io.SequenceFile}
with {@link org.apache.mahout.graph.model.Edge}s as keys and
- * any Writable as values and another {@link
org.apache.hadoop.io.SequenceFile} with {@link IntWritable}s as keys and {@link
Vertex} as
- * values, as produced by {@link
org.apache.mahout.graph.preprocessing.GraphUtils )}</p>
- *
- * <p>This job outputs text files with a source vertex id, a reached vertex id
and its score</p>
+ * <p>This job outputs text files with a vertex id and its pagerank per
line.</p>
*
* <p>Command line arguments specific to this class are:</p>
*
* <ol>
- * <li>-Dmapred.output.dir=(path): output path</li>
- * <li>--vertexIndex=(path): Directory containing vertex index as created by
GraphUtils.indexVertices()</li>
- * <li>--edges=(path): Directory containing edges of the graph</li>
- * <li>--sourceVertexIndex (Integer): index of source vertex</li>
- * <li>--numVertices=(Integer): number of vertices in the graph</li>
- * <li>--numIterations=(Integer): number of numIterations, default: 5</li>
- * <li>--stayingProbability=(Double): probability not to teleport to a random
vertex, default: 0.8</li>
+ * <li>--output=(path): output path</li>
+ * <li>--vertices=(path): file containing the list of vertices of the graph
(one per line)</li>
+ * <li>--sourceVertexIndex=(int): index of the source vertex for the random
walk (line number in vertices file)</li>
+ * <li>--edges=(path): directory containing edges of the graph (pair of vertex
ids per line in textformat)</li>
+ * <li>--numIterations=(Integer): number of numIterations, default: 10</li>
+ * <li>--stayingProbability=(Double): probability not to teleport to a random
vertex, default: 0.85</li>
* </ol>
*
* <p>General command line options are documented in {@link AbstractJob}.</p>
@@ -67,7 +60,7 @@ public class RandomWalkWithRestartJob ex
@Override
protected void addSpecificOptions() {
- addOption("sourceVertexIndex", "svi", "index of source vertex", true);
+ addOption("sourceVertexIndex", null, "index of source vertex", true);
}
@Override
Modified:
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJobTest.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJobTest.java?rev=1201457&r1=1201456&r2=1201457&view=diff
==============================================================================
---
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJobTest.java
(original)
+++
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJobTest.java
Sun Nov 13 17:00:36 2011
@@ -103,14 +103,14 @@ public class ParallelALSFactorizationJob
StringBuilder info = new StringBuilder();
info.append("\nA - users x items\n\n");
- info.append(nice(preferences));
+ info.append(MathHelper.nice(preferences));
info.append("\nU - users x features\n\n");
- info.append(nice(u));
+ info.append(MathHelper.nice(u));
info.append("\nM - items x features\n\n");
- info.append(nice(m));
+ info.append(MathHelper.nice(m));
Matrix Ak = u.times(m.transpose());
info.append("\nAk - users x items\n\n");
- info.append(nice(Ak));
+ info.append(MathHelper.nice(Ak));
info.append("\n");
log.info(info.toString());
@@ -175,16 +175,16 @@ public class ParallelALSFactorizationJob
StringBuilder info = new StringBuilder();
info.append("\nObservations - users x items\n");
- info.append(nice(observations));
+ info.append(MathHelper.nice(observations));
info.append("\nA - users x items\n\n");
- info.append(nice(preferences));
+ info.append(MathHelper.nice(preferences));
info.append("\nU - users x features\n\n");
- info.append(nice(u));
+ info.append(MathHelper.nice(u));
info.append("\nM - items x features\n\n");
- info.append(nice(m));
+ info.append(MathHelper.nice(m));
Matrix Ak = u.times(m.transpose());
info.append("\nAk - users x items\n\n");
- info.append(nice(Ak));
+ info.append(MathHelper.nice(Ak));
info.append("\n");
log.info(info.toString());
@@ -231,11 +231,5 @@ public class ParallelALSFactorizationJob
return prefsAsText.toString();
}
- protected StringBuilder nice(Matrix matrix) {
- StringBuilder info = new StringBuilder();
- for (int n = 0; n < matrix.numRows(); n++) {
- info.append(ALSUtils.nice(matrix.viewRow(n))).append("\n");
- }
- return info;
- }
+
}
Modified:
mahout/trunk/core/src/test/java/org/apache/mahout/graph/linkanalysis/PageRankJobTest.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/graph/linkanalysis/PageRankJobTest.java?rev=1201457&r1=1201456&r2=1201457&view=diff
==============================================================================
---
mahout/trunk/core/src/test/java/org/apache/mahout/graph/linkanalysis/PageRankJobTest.java
(original)
+++
mahout/trunk/core/src/test/java/org/apache/mahout/graph/linkanalysis/PageRankJobTest.java
Sun Nov 13 17:00:36 2011
@@ -22,27 +22,29 @@ import com.google.common.collect.Iterabl
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.MahoutTestCase;
import org.apache.mahout.common.iterator.FileLineIterable;
-import org.apache.mahout.graph.GraphTestCase;
-import org.apache.mahout.graph.model.Edge;
-import org.apache.mahout.graph.preprocessing.GraphUtils;
+import org.apache.mahout.graph.AdjacencyMatrixJob;
import org.apache.mahout.math.DenseMatrix;
import org.apache.mahout.math.Matrix;
import org.apache.mahout.math.hadoop.MathHelper;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Map;
-/** example from "Mining Massive Datasets" */
-public final class PageRankJobTest extends GraphTestCase {
+/** example from "Mining Massive Datasets" (page 157) */
+public final class PageRankJobTest extends MahoutTestCase {
+ private static final Logger log =
LoggerFactory.getLogger(PageRankJobTest.class);
@Test
public void toyIntegrationTest() throws Exception {
File verticesFile = getTestTempFile("vertices.txt");
- File edgesFile = getTestTempFile("edges.seq");
- File indexedVerticesFile = getTestTempFile("indexedVertices.seq");
+ File edgesFile = getTestTempFile("edges.txt");
File outputDir = getTestTempDir("output");
outputDir.delete();
File tempDir = getTestTempDir();
@@ -51,48 +53,72 @@ public final class PageRankJobTest exten
writeLines(verticesFile, "12", "34", "56", "78");
- writeComponents(edgesFile, conf, Edge.class,
- new Edge(12, 34),
- new Edge(12, 56),
- new Edge(12, 78),
- new Edge(34, 12),
- new Edge(34, 78),
- new Edge(56, 56),
- new Edge(78, 34),
- new Edge(78, 56));
-
- int numVertices = GraphUtils.indexVertices(conf, new
Path(verticesFile.getAbsolutePath()),
- new Path(indexedVerticesFile.getAbsolutePath()));
+ writeLines(edgesFile,
+ "12,34",
+ "12,56",
+ "12,78",
+ "34,12",
+ "34,78",
+ "56,56",
+ "78,34",
+ "78,56");
PageRankJob pageRank = new PageRankJob();
pageRank.setConf(conf);
- pageRank.run(new String[] { "--vertexIndex",
indexedVerticesFile.getAbsolutePath(),
- "--edges", edgesFile.getAbsolutePath(), "--output",
outputDir.getAbsolutePath(),
- "--numVertices", String.valueOf(numVertices), "--numIterations", "3",
- "--stayingProbability", "0.8", "--tempDir", tempDir.getAbsolutePath()
});
+ pageRank.run(new String[] { "--vertices", verticesFile.getAbsolutePath(),
"--edges", edgesFile.getAbsolutePath(),
+ "--output", outputDir.getAbsolutePath(), "--numIterations", "3",
"--stayingProbability", "0.8",
+ "--tempDir", tempDir.getAbsolutePath() });
+
+ Matrix expectedAdjacencyMatrix = new DenseMatrix(new double[][] {
+ { 0, 1, 1, 1 },
+ { 1, 0, 0, 1 },
+ { 0, 0, 1, 0 },
+ { 0, 1, 1, 0 } });
+
+ int numVertices = HadoopUtil.readInt(new Path(tempDir.getAbsolutePath(),
AdjacencyMatrixJob.NUM_VERTICES), conf);
+ assertEquals(4, numVertices);
+ Matrix actualAdjacencyMatrix = MathHelper.readMatrix(conf, new
Path(tempDir.getAbsolutePath(),
+ AdjacencyMatrixJob.ADJACENCY_MATRIX + "/part-r-00000"), numVertices,
numVertices);
- Matrix expectedAdjacenyMatrix = new DenseMatrix(new double[][] {
+ StringBuilder info = new StringBuilder();
+ info.append("\nexpected adjacency matrix\n\n");
+ info.append(MathHelper.nice(expectedAdjacencyMatrix));
+ info.append("\nactual adjacency matrix \n\n");
+ info.append(MathHelper.nice(actualAdjacencyMatrix));
+ info.append("\n");
+ log.info(info.toString());
+
+ Matrix expectedTransitionMatrix = new DenseMatrix(new double[][] {
{ 0.0, 0.4, 0.0, 0.0 },
{ 0.266666667, 0.0, 0.0, 0.4 },
{ 0.266666667, 0.0, 0.8, 0.4 },
{ 0.266666667, 0.4, 0.0, 0.0 } });
- Matrix actualAdjacencyMatrix = MathHelper.readMatrix(conf, new
Path(tempDir.getAbsolutePath(),
- "adjacencyMatrix/part-r-00000"), numVertices, numVertices);
+ Matrix actualTransitionMatrix = MathHelper.readMatrix(conf, new
Path(tempDir.getAbsolutePath(),
+ "transitionMatrix/part-r-00000"), numVertices, numVertices);
+
+ info = new StringBuilder();
+ info.append("\nexpected transition matrix\n\n");
+ info.append(MathHelper.nice(expectedTransitionMatrix));
+ info.append("\nactual transition matrix\n\n");
+ info.append(MathHelper.nice(actualTransitionMatrix));
+ info.append("\n");
+ log.info(info.toString());
- assertMatrixEquals(expectedAdjacenyMatrix, actualAdjacencyMatrix);
+ MathHelper.assertMatrixEquals(expectedAdjacencyMatrix,
actualAdjacencyMatrix);
+ MathHelper.assertMatrixEquals(expectedTransitionMatrix,
actualTransitionMatrix);
- Map<Long,Double> rankPerVertex = Maps.newHashMap();
+ Map<Integer,Double> rankPerVertex =
Maps.newHashMapWithExpectedSize(numVertices);
for (CharSequence line : new FileLineIterable(new File(outputDir,
"part-m-00000"))) {
String[] tokens = Iterables.toArray(Splitter.on("\t").split(line),
String.class);
- rankPerVertex.put(Long.parseLong(tokens[0]),
Double.parseDouble(tokens[1]));
+ rankPerVertex.put(Integer.parseInt(tokens[0]),
Double.parseDouble(tokens[1]));
}
assertEquals(4, rankPerVertex.size());
- assertEquals(0.1206666, rankPerVertex.get(12L), EPSILON);
- assertEquals(0.1571111, rankPerVertex.get(34L), EPSILON);
- assertEquals(0.5651111, rankPerVertex.get(56L), EPSILON);
- assertEquals(0.1571111, rankPerVertex.get(78L), EPSILON);
+ assertEquals(0.1206666, rankPerVertex.get(12), EPSILON);
+ assertEquals(0.1571111, rankPerVertex.get(34), EPSILON);
+ assertEquals(0.5651111, rankPerVertex.get(56), EPSILON);
+ assertEquals(0.1571111, rankPerVertex.get(78), EPSILON);
}
Modified:
mahout/trunk/core/src/test/java/org/apache/mahout/graph/linkanalysis/RandomWalkWithRestartJobTest.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/graph/linkanalysis/RandomWalkWithRestartJobTest.java?rev=1201457&r1=1201456&r2=1201457&view=diff
==============================================================================
---
mahout/trunk/core/src/test/java/org/apache/mahout/graph/linkanalysis/RandomWalkWithRestartJobTest.java
(original)
+++
mahout/trunk/core/src/test/java/org/apache/mahout/graph/linkanalysis/RandomWalkWithRestartJobTest.java
Sun Nov 13 17:00:36 2011
@@ -22,26 +22,29 @@ import com.google.common.collect.Iterabl
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.MahoutTestCase;
import org.apache.mahout.common.iterator.FileLineIterable;
-import org.apache.mahout.graph.GraphTestCase;
-import org.apache.mahout.graph.preprocessing.GraphUtils;
-import org.apache.mahout.graph.model.Edge;
+import org.apache.mahout.graph.AdjacencyMatrixJob;
import org.apache.mahout.math.DenseMatrix;
import org.apache.mahout.math.Matrix;
import org.apache.mahout.math.hadoop.MathHelper;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Map;
-public class RandomWalkWithRestartJobTest extends GraphTestCase {
+public class RandomWalkWithRestartJobTest extends MahoutTestCase {
+
+ private static final Logger log =
LoggerFactory.getLogger(RandomWalkWithRestartJobTest.class);
@Test
public void toyIntegrationTest() throws Exception {
File verticesFile = getTestTempFile("vertices.txt");
- File edgesFile = getTestTempFile("edges.seq");
- File indexedVerticesFile = getTestTempFile("indexedVertices.seq");
+ File edgesFile = getTestTempFile("edges.txt");
File outputDir = getTestTempDir("output");
outputDir.delete();
File tempDir = getTestTempDir();
@@ -50,38 +53,62 @@ public class RandomWalkWithRestartJobTes
writeLines(verticesFile, "12", "34", "56", "78");
- writeComponents(edgesFile, conf, Edge.class,
- new Edge(12, 34),
- new Edge(12, 56),
- new Edge(34, 34),
- new Edge(34, 78),
- new Edge(56, 12),
- new Edge(56, 34),
- new Edge(56, 56),
- new Edge(56, 78),
- new Edge(78, 34));
-
- int numVertices = GraphUtils.indexVertices(conf, new
Path(verticesFile.getAbsolutePath()),
- new Path(indexedVerticesFile.getAbsolutePath()));
+ writeLines(edgesFile,
+ "12,34",
+ "12,56",
+ "34,34",
+ "34,78",
+ "56,12",
+ "56,34",
+ "56,56",
+ "56,78",
+ "78,34");
RandomWalk randomWalkWithRestart = new RandomWalkWithRestartJob();
randomWalkWithRestart.setConf(conf);
- randomWalkWithRestart.run(new String[]{"--vertexIndex",
indexedVerticesFile.getAbsolutePath(),
+ randomWalkWithRestart.run(new String[]{"--vertices",
verticesFile.getAbsolutePath(),
"--edges", edgesFile.getAbsolutePath(), "--sourceVertexIndex",
String.valueOf(2),
- "--output", outputDir.getAbsolutePath(), "--numVertices",
String.valueOf(numVertices),
- "--numIterations", String.valueOf(2), "--stayingProbability",
String.valueOf(0.75),
- "--tempDir", tempDir.getAbsolutePath()});
+ "--output", outputDir.getAbsolutePath(), "--numIterations",
String.valueOf(2),
+ "--stayingProbability", String.valueOf(0.75), "--tempDir",
tempDir.getAbsolutePath()});
+
+ Matrix expectedAdjacencyMatrix = new DenseMatrix(new double[][] {
+ { 0, 1, 1, 0 },
+ { 0, 1, 0, 1 },
+ { 1, 1, 1, 1 },
+ { 0, 1, 0, 0 } });
+
+ int numVertices = HadoopUtil.readInt(new Path(tempDir.getAbsolutePath(),
AdjacencyMatrixJob.NUM_VERTICES), conf);
+ assertEquals(4, numVertices);
+ Matrix actualAdjacencyMatrix = MathHelper.readMatrix(conf, new
Path(tempDir.getAbsolutePath(),
+ AdjacencyMatrixJob.ADJACENCY_MATRIX + "/part-r-00000"), numVertices,
numVertices);
+
+ StringBuilder info = new StringBuilder();
+ info.append("\nexpected adjacency matrix\n\n");
+ info.append(MathHelper.nice(expectedAdjacencyMatrix));
+ info.append("\nactual adjacency matrix \n\n");
+ info.append(MathHelper.nice(actualAdjacencyMatrix));
+ info.append("\n");
+ log.info(info.toString());
- Matrix expectedAdjacenyMatrix = new DenseMatrix(new double[][] {
+ Matrix expectedTransitionMatrix = new DenseMatrix(new double[][] {
{ 0, 0, 0.1875, 0 },
{ 0.375, 0.375, 0.1875, 0.75 },
{ 0.375, 0, 0.1875, 0 },
{ 0, 0.375, 0.1875, 0 } });
- Matrix actualAdjacencyMatrix = MathHelper.readMatrix(conf, new
Path(tempDir.getAbsolutePath(),
- "adjacencyMatrix/part-r-00000"), numVertices, numVertices);
+ Matrix actualTransitionMatrix = MathHelper.readMatrix(conf, new
Path(tempDir.getAbsolutePath(),
+ "transitionMatrix/part-r-00000"), numVertices, numVertices);
+
+ info = new StringBuilder();
+ info.append("\nexpected transition matrix\n\n");
+ info.append(MathHelper.nice(expectedTransitionMatrix));
+ info.append("\nactual transition matrix\n\n");
+ info.append(MathHelper.nice(actualTransitionMatrix));
+ info.append("\n");
+ log.info(info.toString());
- assertMatrixEquals(expectedAdjacenyMatrix, actualAdjacencyMatrix);
+ MathHelper.assertMatrixEquals(expectedAdjacencyMatrix,
actualAdjacencyMatrix);
+ MathHelper.assertMatrixEquals(expectedTransitionMatrix,
actualTransitionMatrix);
Map<Long,Double> steadyStateProbabilities = Maps.newHashMap();
for (CharSequence line : new FileLineIterable(new File(outputDir,
"part-m-00000"))) {
Modified:
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java?rev=1201457&r1=1201456&r2=1201457&view=diff
==============================================================================
---
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java
(original)
+++
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java
Sun Nov 13 17:00:36 2011
@@ -18,7 +18,10 @@
package org.apache.mahout.math.hadoop;
import java.io.IOException;
+import java.text.DecimalFormat;
+import java.text.DecimalFormatSymbols;
import java.util.Iterator;
+import java.util.Locale;
import com.google.common.io.Closeables;
import org.apache.hadoop.conf.Configuration;
@@ -29,15 +32,13 @@ import org.apache.hadoop.io.SequenceFile
import org.apache.mahout.common.MahoutTestCase;
import org.apache.mahout.common.Pair;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
-import org.apache.mahout.math.DenseMatrix;
-import org.apache.mahout.math.Matrix;
-import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.*;
import org.apache.mahout.math.hadoop.DistributedRowMatrix.MatrixEntryWritable;
import org.easymock.IArgumentMatcher;
import org.easymock.EasyMock;
+import static org.junit.Assert.assertEquals;
+
/**
* a collection of small helper methods useful for unit-testing mathematical
operations
*/
@@ -169,11 +170,13 @@ public final class MathHelper {
* read a {@link Matrix} from a SequenceFile<IntWritable,VectorWritable>
*/
public static Matrix readMatrix(Configuration conf, Path path, int rows, int
columns) {
+ boolean readOneRow = false;
Matrix matrix = new DenseMatrix(rows, columns);
for (Pair<IntWritable,VectorWritable> record :
new SequenceFileIterable<IntWritable,VectorWritable>(path, true,
conf)) {
IntWritable key = record.getFirst();
VectorWritable value = record.getSecond();
+ readOneRow = true;
int row = key.get();
Iterator<Vector.Element> elementsIterator = value.get().iterateNonZero();
while (elementsIterator.hasNext()) {
@@ -181,6 +184,9 @@ public final class MathHelper {
matrix.set(row, element.index(), element.get());
}
}
+ if (!readOneRow) {
+ throw new IllegalStateException("Not a single row read!");
+ }
return matrix;
}
@@ -203,4 +209,48 @@ public final class MathHelper {
Closeables.closeQuietly(writer);
}
}
+
+ public static void assertMatrixEquals(Matrix expected, Matrix actual) {
+ assertEquals(expected.numRows(), actual.numRows());
+ assertEquals(actual.numCols(), actual.numCols());
+ for (int row = 0; row < expected.numRows(); row++) {
+ for (int col = 0; col < expected.numCols(); col ++) {
+ assertEquals("Non-matching values in [" + row + ',' + col + ']',
+ expected.get(row, col), actual.get(row, col),
MahoutTestCase.EPSILON);
+ }
+ }
+ }
+
+ public static String nice(Vector v) {
+ if (!v.isSequentialAccess()) {
+ v = new DenseVector(v);
+ }
+
+ DecimalFormat df = new DecimalFormat("0.00",
DecimalFormatSymbols.getInstance(Locale.ENGLISH));
+
+ StringBuilder buffer = new StringBuilder("[");
+ String separator = "";
+ for (Vector.Element e : v) {
+ buffer.append(separator);
+ if (!Double.isNaN(e.get())) {
+ if (e.get() >= 0) {
+ buffer.append(" ");
+ }
+ buffer.append(df.format(e.get()));
+ } else {
+ buffer.append(" - ");
+ }
+ separator = "\t";
+ }
+ buffer.append(" ]");
+ return buffer.toString();
+ }
+
+ public static String nice(Matrix matrix) {
+ StringBuilder info = new StringBuilder();
+ for (int n = 0; n < matrix.numRows(); n++) {
+ info.append(nice(matrix.viewRow(n))).append("\n");
+ }
+ return info.toString();
+ }
}
Modified: mahout/trunk/src/conf/driver.classes.props
URL:
http://svn.apache.org/viewvc/mahout/trunk/src/conf/driver.classes.props?rev=1201457&r1=1201456&r2=1201457&view=diff
==============================================================================
--- mahout/trunk/src/conf/driver.classes.props (original)
+++ mahout/trunk/src/conf/driver.classes.props Sun Nov 13 17:00:36 2011
@@ -66,3 +66,6 @@ org.apache.mahout.cf.taste.hadoop.item.R
org.apache.mahout.cf.taste.hadoop.als.ParallelALSFactorizationJob =
parallelALS : ALS-WR factorization of a rating matrix
org.apache.mahout.cf.taste.hadoop.als.RecommenderJob = recommendfactorized :
Compute recommendations using the factorization of a rating matrix
+#Link Analysis
+org.apache.mahout.graph.linkanalysis.PageRankJob = pagerank : compute the
PageRank of a graph
+org.apache.mahout.graph.linkanalysis.RandomWalkWithRestartJob =
randomwalkwithrestart : compute all other vertices' proximity to a source
vertex in a graph
\ No newline at end of file
Added: mahout/trunk/src/conf/pagerank.props
URL:
http://svn.apache.org/viewvc/mahout/trunk/src/conf/pagerank.props?rev=1201457&view=auto
==============================================================================
(empty)
Added: mahout/trunk/src/conf/randomwalkwithrestart.props
URL:
http://svn.apache.org/viewvc/mahout/trunk/src/conf/randomwalkwithrestart.props?rev=1201457&view=auto
==============================================================================
(empty)