GIRAPH-480 Add convergence detection to org.apache.giraph.examples.RandomWalkVertex
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/fc2026fa Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/fc2026fa Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/fc2026fa Branch: refs/heads/trunk Commit: fc2026fa1ac5225a82e47c4968801d5140f6bfa4 Parents: 40e201d Author: ssc <[email protected]> Authored: Sat Mar 9 11:36:43 2013 +0100 Committer: ssc <[email protected]> Committed: Tue Mar 19 08:58:43 2013 +0100 ---------------------------------------------------------------------- giraph-examples/pom.xml | 1 + .../LongDoubleNullDoubleTextInputFormat.java | 106 +++++++++++++++ .../org/apache/giraph/examples/PageRankVertex.java | 56 ++++++++ .../apache/giraph/examples/RandomWalkVertex.java | 98 ++++++++++---- .../examples/RandomWalkWithRestartVertex.java | 36 +++--- .../giraph/examples/RandomWalkWorkerContext.java | 26 +++-- ...texWithDoubleValueNullEdgeTextOutputFormat.java | 59 ++++++++ .../apache/giraph/examples/PageRankVertexTest.java | 89 ++++++++++++ .../giraph/examples/RandomWalkTestUtils.java | 46 +++++++ .../examples/RandomWalkWithRestartVertexTest.java | 46 +++---- 10 files changed, 480 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/pom.xml ---------------------------------------------------------------------- diff --git a/giraph-examples/pom.xml b/giraph-examples/pom.xml index 7a18711..6adcdcd 100644 --- a/giraph-examples/pom.xml +++ b/giraph-examples/pom.xml @@ -53,6 +53,7 @@ under the License. <headerLocation>license-header.txt</headerLocation> <failOnViolation>true</failOnViolation> <includeTestSourceDirectory>false</includeTestSourceDirectory> + <consoleOutput>true</consoleOutput> </configuration> <executions> <execution> http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleNullDoubleTextInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleNullDoubleTextInputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleNullDoubleTextInputFormat.java new file mode 100644 index 0000000..e22194a --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleNullDoubleTextInputFormat.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.examples; + +import com.google.common.collect.Lists; +import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.formats.TextVertexInputFormat; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.List; +import java.util.regex.Pattern; + +/** + * Input format for unweighted graphs with long ids and double vertex values + */ +public class LongDoubleNullDoubleTextInputFormat + extends TextVertexInputFormat<LongWritable, DoubleWritable, NullWritable, + DoubleWritable> + implements ImmutableClassesGiraphConfigurable<LongWritable, DoubleWritable, + NullWritable, DoubleWritable> { + /** Configuration. */ + private ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable, + NullWritable, DoubleWritable> conf; + + @Override + public TextVertexReader createVertexReader(InputSplit split, + TaskAttemptContext context) + throws IOException { + return new LongDoubleNullDoubleVertexReader(); + } + + @Override + public void setConf(ImmutableClassesGiraphConfiguration<LongWritable, + DoubleWritable, NullWritable, DoubleWritable> configuration) { + this.conf = configuration; + } + + @Override + public ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable, + NullWritable, DoubleWritable> getConf() { + return conf; + } + + /** + * Vertex reader associated with + * {@link LongDoubleNullDoubleTextInputFormat}. + */ + public class LongDoubleNullDoubleVertexReader extends + TextVertexInputFormat<LongWritable, DoubleWritable, NullWritable, + DoubleWritable>.TextVertexReader { + /** Separator of the vertex and neighbors */ + private final Pattern separator = Pattern.compile("[\t ]"); + + @Override + public Vertex<LongWritable, DoubleWritable, NullWritable, DoubleWritable> + getCurrentVertex() throws IOException, InterruptedException { + Vertex<LongWritable, DoubleWritable, NullWritable, DoubleWritable> + vertex = conf.createVertex(); + + String[] tokens = + separator.split(getRecordReader().getCurrentValue().toString()); + List<Edge<LongWritable, NullWritable>> edges = + Lists.newArrayListWithCapacity(tokens.length - 1); + for (int n = 1; n < tokens.length; n++) { + edges.add(EdgeFactory.create( + new LongWritable(Long.parseLong(tokens[n])), + NullWritable.get())); + } + + LongWritable vertexId = new LongWritable(Long.parseLong(tokens[0])); + vertex.initialize(vertexId, new DoubleWritable(), edges); + + return vertex; + } + + @Override + public boolean nextVertex() throws IOException, InterruptedException { + return getRecordReader().nextKeyValue(); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java new file mode 100644 index 0000000..733ee53 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.examples; + +import org.apache.giraph.edge.Edge; +import org.apache.giraph.utils.MathUtils; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; + +/** + * The PageRank algorithm, with uniform transition probabilities on the edges + * http://en.wikipedia.org/wiki/PageRank + */ +public class PageRankVertex extends RandomWalkVertex<NullWritable> { + + @Override + protected double transitionProbability(double stateProbability, + Edge<LongWritable, NullWritable> edge) { + return stateProbability / getNumEdges(); + } + + @Override + protected double recompute(Iterable<DoubleWritable> partialRanks, + double teleportationProbability) { + + // rank contribution from incident neighbors + double rankFromNeighbors = MathUtils.sum(partialRanks); + // rank contribution from dangling vertices + double danglingContribution = + getDanglingProbability() / getTotalNumVertices(); + + // recompute rank + double rank = (1d - teleportationProbability) * + (rankFromNeighbors + danglingContribution) + + teleportationProbability / getTotalNumVertices(); + + return rank; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java index 8196523..85c6e27 100644 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java @@ -24,34 +24,53 @@ import org.apache.giraph.edge.Edge; import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; import org.apache.log4j.Logger; import java.io.IOException; /** - * Base class for executing a random walk on the graph + * Base class for executing a random walk on a graph + * + * @param <E> edge type */ -public abstract class RandomWalkVertex - extends Vertex<LongWritable, DoubleWritable, DoubleWritable, - DoubleWritable> { +public abstract class RandomWalkVertex<E extends Writable> + extends Vertex<LongWritable, DoubleWritable, E, DoubleWritable> { /** Configuration parameter for the number of supersteps to execute */ static final String MAX_SUPERSTEPS = RandomWalkVertex.class.getName() + ".maxSupersteps"; /** Configuration parameter for the teleportation probability */ static final String TELEPORTATION_PROBABILITY = RandomWalkVertex.class .getName() + ".teleportationProbability"; - /** Name of aggregator for dangling nodes */ - static final String DANGLING = "dangling"; + /** Name of aggregator for collecting the probability of dangling vertices */ + static final String CUMULATIVE_DANGLING_PROBABILITY = RandomWalkVertex.class + .getName() + ".cumulativeDanglingProbability"; + /** Name of aggregator for the L1 norm of the probability difference, used + * for covergence detection */ + static final String L1_NORM_OF_PROBABILITY_DIFFERENCE = RandomWalkVertex.class + .getName() + ".l1NormOfProbabilityDifference"; /** Logger */ private static final Logger LOG = Logger.getLogger(RandomWalkVertex.class); - /** State probability of the vertex */ - protected final DoubleWritable d = new DoubleWritable(); + /** Reusable {@link DoubleWritable} instance to avoid object instantiation */ + private final DoubleWritable doubleWritable = new DoubleWritable(); /** - * Compute an initial probability distribution for the vertex. + * Compute an initial probability value for the vertex. Per default, + * we start with a uniform distribution. * @return The initial probability value. */ - protected abstract double initialProbability(); + protected double initialProbability() { + return 1.0 / getTotalNumVertices(); + } + + /** + * Compute the probability of transitioning to a neighbor vertex + * @param stateProbability current steady state probability of the vertex + * @param edge edge to neighbor + * @return the probability of transitioning to a neighbor vertex + */ + protected abstract double transitionProbability(double stateProbability, + Edge<LongWritable, E> edge); /** * Perform a single step of a random walk computation. @@ -63,30 +82,43 @@ public abstract class RandomWalkVertex protected abstract double recompute(Iterable<DoubleWritable> messages, double teleportationProbability); + /** + * Returns the cumulative probability from dangling nodes. + * @return The cumulative probability from dangling nodes. + */ + protected double getDanglingProbability() { + return this.<DoubleWritable>getAggregatedValue( + RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY).get(); + } + @Override public void compute(Iterable<DoubleWritable> messages) throws IOException { double stateProbability; if (getSuperstep() > 0) { + double previousStateProbability = getValue().get(); stateProbability = recompute(messages, teleportationProbability()); + + doubleWritable.set(Math.abs(stateProbability - previousStateProbability)); + aggregate(L1_NORM_OF_PROBABILITY_DIFFERENCE, doubleWritable); + } else { stateProbability = initialProbability(); } - d.set(stateProbability); - setValue(d); + doubleWritable.set(stateProbability); + setValue(doubleWritable); // Compute dangling node contribution for next superstep if (getNumEdges() == 0) { - aggregate(DANGLING, d); + aggregate(CUMULATIVE_DANGLING_PROBABILITY, doubleWritable); } - // Execute the algorithm as often as configured, - // alternatively convergence could be checked via an Aggregator if (getSuperstep() < maxSupersteps()) { - for (Edge<LongWritable, DoubleWritable> edge : getEdges()) { - double transitionProbability = stateProbability * edge.getValue().get(); - sendMessage(edge.getTargetVertexId(), new DoubleWritable( - transitionProbability)); + for (Edge<LongWritable, E> edge : getEdges()) { + double transitionProbability = + transitionProbability(stateProbability, edge); + doubleWritable.set(transitionProbability); + sendMessage(edge.getTargetVertexId(), doubleWritable); } } else { voteToHalt(); @@ -116,20 +148,38 @@ public abstract class RandomWalkVertex */ public static class RandomWalkVertexMasterCompute extends DefaultMasterCompute { + + /** threshold for the L1 norm of the state vector difference */ + static final double CONVERGENCE_THRESHOLD = 0.00001; + @Override public void compute() { - // TODO This is a good place to implement halting by checking convergence. double danglingContribution = - this.<DoubleWritable>getAggregatedValue(RandomWalkVertex.DANGLING) - .get(); + this.<DoubleWritable>getAggregatedValue( + RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY).get(); + double l1NormOfStateDiff = + this.<DoubleWritable>getAggregatedValue( + RandomWalkVertex.L1_NORM_OF_PROBABILITY_DIFFERENCE).get(); + LOG.info("[Superstep " + getSuperstep() + "] Dangling contribution = " + - danglingContribution); + danglingContribution + ", L1 Norm of state vector difference = " + + l1NormOfStateDiff); + + // Convergence check: halt once the L1 norm of the difference between the + // state vectors fall under the threshold + if (getSuperstep() > 1 && l1NormOfStateDiff < CONVERGENCE_THRESHOLD) { + haltComputation(); + } + } @Override public void initialize() throws InstantiationException, IllegalAccessException { - registerAggregator(RandomWalkVertex.DANGLING, DoubleSumAggregator.class); + registerAggregator(RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY, + DoubleSumAggregator.class); + registerAggregator(RandomWalkVertex.L1_NORM_OF_PROBABILITY_DIFFERENCE, + DoubleSumAggregator.class); } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java index 8a689ed..6f3eb6c 100644 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java @@ -18,15 +18,19 @@ package org.apache.giraph.examples; +import com.google.common.base.Preconditions; +import org.apache.giraph.edge.Edge; import org.apache.giraph.utils.MathUtils; import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; /** * Executes "RandomWalkWithRestart", a random walk on the graph which is biased * towards a source vertex. The resulting probabilities of staying at a given * vertex can be interpreted as a measure of proximity to the source vertex. */ -public class RandomWalkWithRestartVertex extends RandomWalkVertex { +public class RandomWalkWithRestartVertex + extends RandomWalkVertex<DoubleWritable> { /** Configuration parameter for the source vertex */ static final String SOURCE_VERTEX = RandomWalkWithRestartVertex.class @@ -42,34 +46,26 @@ public class RandomWalkWithRestartVertex extends RandomWalkVertex { } /** - * Returns the number of source vertexes. - * @return The number of source vertexes. + * Returns the number of source vertices. + * @return The number of source vertices. */ - private int numSourceVertexes() { + private int numSourceVertices() { return ((RandomWalkWorkerContext) getWorkerContext()).numSources(); } - /** - * Returns the cumulated probability from dangling nodes. - * @return The cumulated probability from dangling nodes. - */ - private double getDanglingProbability() { - return this.<DoubleWritable>getAggregatedValue(RandomWalkVertex.DANGLING) - .get(); - } - - /** - * Start with a uniform distribution. - * @return A uniform probability over all the vertexces. - */ @Override - protected double initialProbability() { - return 1.0 / getTotalNumVertices(); + protected double transitionProbability(double stateProbability, + Edge<LongWritable, DoubleWritable> edge) { + return stateProbability * edge.getValue().get(); } @Override protected double recompute(Iterable<DoubleWritable> transitionProbabilities, double teleportationProbability) { + + int numSourceVertices = numSourceVertices(); + Preconditions.checkState(numSourceVertices > 0, "No source vertex found"); + double stateProbability = MathUtils.sum(transitionProbabilities); // Add the contribution of dangling nodes (weakly preferential // implementation: dangling nodes redistribute uniformly) @@ -77,7 +73,7 @@ public class RandomWalkWithRestartVertex extends RandomWalkVertex { // The random walk might teleport back to one of the source vertexes stateProbability *= 1 - teleportationProbability; if (isSourceVertex()) { - stateProbability += teleportationProbability / numSourceVertexes(); + stateProbability += teleportationProbability / numSourceVertices; } return stateProbability; } http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java index 5cff23f..2566f43 100644 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java @@ -103,18 +103,25 @@ public class RandomWalkWorkerContext extends WorkerContext { * Second option is a file with a list of vertex IDs, one per line. In this * second case the preference vector is a uniform distribution over these * vertexes. - * @param configuration - * The configuration. + * @param configuration The configuration. + * @return a (possibly empty) set of source vertices */ - private void initializeSources(Configuration configuration) { + private ImmutableSet<Long> initializeSources(Configuration configuration) { ImmutableSet.Builder<Long> builder = ImmutableSet.builder(); long sourceVertex = configuration.getLong(SOURCE_VERTEX, Long.MIN_VALUE); if (sourceVertex != Long.MIN_VALUE) { - builder.add(sourceVertex); + return ImmutableSet.of(sourceVertex); } else { Path sourceFile = null; try { - sourceFile = DistributedCache.getLocalCacheFiles(configuration)[0]; + + Path[] cacheFiles = DistributedCache.getLocalCacheFiles(configuration); + if (cacheFiles == null || cacheFiles.length == 0) { + // empty set if no source vertices configured + return ImmutableSet.of(); + } + + sourceFile = cacheFiles[0]; FileSystem fs = FileSystem.getLocal(configuration); BufferedReader in = new BufferedReader(new InputStreamReader( fs.open(sourceFile))); @@ -124,25 +131,24 @@ public class RandomWalkWorkerContext extends WorkerContext { } in.close(); } catch (IOException e) { - e.printStackTrace(); getContext().setStatus( "Could not load local cache files: " + sourceFile); - LOG.error("Could not load local cache files: " + sourceFile); + LOG.error("Could not load local cache files: " + sourceFile, e); } } - SOURCES = builder.build(); + return builder.build(); } @Override public void preApplication() throws InstantiationException, IllegalAccessException { - Configuration configuration = this.getContext().getConfiguration(); + Configuration configuration = getContext().getConfiguration(); MAX_SUPERSTEPS = configuration.getInt(RandomWalkVertex.MAX_SUPERSTEPS, DEFAULT_MAX_SUPERSTEPS); TELEPORTATION_PROBABILITY = configuration.getFloat( RandomWalkVertex.TELEPORTATION_PROBABILITY, DEFAULT_TELEPORTATION_PROBABILITY); - initializeSources(configuration); + SOURCES = initializeSources(configuration); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueNullEdgeTextOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueNullEdgeTextOutputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueNullEdgeTextOutputFormat.java new file mode 100644 index 0000000..85f3556 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueNullEdgeTextOutputFormat.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.examples; + +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.formats.TextVertexOutputFormat; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * Output format for vertices with a long as id, a double as value and + * null edges + */ +public class VertexWithDoubleValueNullEdgeTextOutputFormat extends + TextVertexOutputFormat<LongWritable, DoubleWritable, NullWritable> { + @Override + public TextVertexWriter createVertexWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + return new VertexWithDoubleValueWriter(); + } + + /** + * Vertex writer used with + * {@link VertexWithDoubleValueNullEdgeTextOutputFormat}. + */ + public class VertexWithDoubleValueWriter extends TextVertexWriter { + @Override + public void writeVertex( + Vertex<LongWritable, DoubleWritable, NullWritable, ?> vertex) + throws IOException, InterruptedException { + StringBuilder output = new StringBuilder(); + output.append(vertex.getId().get()); + output.append('\t'); + output.append(vertex.getValue().get()); + getRecordWriter().write(new Text(output.toString()), null); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java new file mode 100644 index 0000000..9672d20 --- /dev/null +++ b/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.examples; + +import com.google.common.collect.Maps; +import org.apache.giraph.conf.GiraphClasses; +import org.apache.giraph.edge.ByteArrayEdges; +import org.apache.giraph.utils.InternalVertexRunner; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.junit.Test; + +import java.util.Map; + +import static org.junit.Assert.assertEquals; + + +/** + * Tests for {@link PageRankVertex} + */ +public class PageRankVertexTest { + + /** + * A local integration test on toy data + */ + @Test + public void testToyData() throws Exception { + + // A small graph + String[] graph = new String[] { + "1 4 2 3", + "2 1", + "3", + "4 3 2", + "5 2 4" + }; + + Map<String, String> params = Maps.newHashMap(); + params.put(RandomWalkWithRestartVertex.MAX_SUPERSTEPS, String.valueOf(50)); + params.put(RandomWalkWithRestartVertex.TELEPORTATION_PROBABILITY, + String.valueOf(0.15)); + + GiraphClasses<LongWritable, DoubleWritable, NullWritable, DoubleWritable> + classes = new GiraphClasses<LongWritable, DoubleWritable, + NullWritable, DoubleWritable>(); + classes.setVertexClass(PageRankVertex.class); + classes.setVertexEdgesClass(ByteArrayEdges.class); + classes.setVertexInputFormatClass( + LongDoubleNullDoubleTextInputFormat.class); + classes.setVertexOutputFormatClass( + VertexWithDoubleValueNullEdgeTextOutputFormat.class); + classes.setWorkerContextClass(RandomWalkWorkerContext.class); + classes.setMasterComputeClass( + RandomWalkVertex.RandomWalkVertexMasterCompute.class); + // Run internally + Iterable<String> results = InternalVertexRunner.run(classes, params, graph); + + Map<Long, Double> steadyStateProbabilities = + RandomWalkTestUtils.parseSteadyStateProbabilities(results); + + assertEquals(0.28159076008518047, steadyStateProbabilities.get(1l), + RandomWalkTestUtils.EPSILON); + assertEquals(0.2514648601529863, steadyStateProbabilities.get(2l), + RandomWalkTestUtils.EPSILON); + assertEquals(0.22262961972286327, steadyStateProbabilities.get(3l), + RandomWalkTestUtils.EPSILON); + assertEquals(0.17646783276703806, steadyStateProbabilities.get(4l), + RandomWalkTestUtils.EPSILON); + assertEquals(0.06784692727193153, steadyStateProbabilities.get(5l), + RandomWalkTestUtils.EPSILON); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkTestUtils.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkTestUtils.java b/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkTestUtils.java new file mode 100644 index 0000000..71528a3 --- /dev/null +++ b/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkTestUtils.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.examples; + +import com.google.common.collect.Maps; + +import java.util.Map; + +public class RandomWalkTestUtils { + + /** Minimum difference between doubles */ + public static final double EPSILON = 10e-3; + + /** + * Parse steady state probabilities. + * @param results The steady state probabilities in text format. + * @return A map representation of the steady state probabilities. + */ + public static Map<Long, Double> parseSteadyStateProbabilities( + Iterable<String> results) { + Map<Long, Double> result = Maps.newHashMap(); + for (String s : results) { + String[] tokens = s.split("\\t"); + Long id = Long.parseLong(tokens[0]); + Double value = Double.parseDouble(tokens[1]); + result.put(id, value); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java index 489b35a..1ae9c52 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java +++ b/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java @@ -37,9 +37,6 @@ import static org.junit.Assert.assertEquals; */ public class RandomWalkWithRestartVertexTest { - /** Minimum difference between doubles */ - private static final double EPSILON = 10e-3; - /** * A local integration test on toy data */ @@ -69,13 +66,15 @@ public class RandomWalkWithRestartVertexTest { Iterable<String> results = InternalVertexRunner.run(classes, params, graph); Map<Long, Double> steadyStateProbabilities = - parseSteadyStateProbabilities(results); + RandomWalkTestUtils.parseSteadyStateProbabilities(results); // values computed with external software // 0.25, 0.354872, 0.09375, 0.301377 - assertEquals(0.25, steadyStateProbabilities.get(12L), EPSILON); - assertEquals(0.354872, steadyStateProbabilities.get(34L), EPSILON); - assertEquals(0.09375, steadyStateProbabilities.get(56L), EPSILON); - assertEquals(0.301377, steadyStateProbabilities.get(78L), EPSILON); + assertEquals(0.25, steadyStateProbabilities.get(12L), RandomWalkTestUtils.EPSILON); + assertEquals(0.354872, steadyStateProbabilities.get(34L), + RandomWalkTestUtils.EPSILON); + assertEquals(0.09375, steadyStateProbabilities.get(56L), RandomWalkTestUtils.EPSILON); + assertEquals(0.301377, steadyStateProbabilities.get(78L), + RandomWalkTestUtils.EPSILON); } /** @@ -108,29 +107,18 @@ public class RandomWalkWithRestartVertexTest { Iterable<String> results = InternalVertexRunner.run(classes, params, graph); Map<Long, Double> steadyStateProbabilities = - parseSteadyStateProbabilities(results); + RandomWalkTestUtils.parseSteadyStateProbabilities(results); // values computed with external software // 0.163365, 0.378932, 0.156886, 0.300816 - assertEquals(0.163365, steadyStateProbabilities.get(12L), EPSILON); - assertEquals(0.378932, steadyStateProbabilities.get(34L), EPSILON); - assertEquals(0.156886, steadyStateProbabilities.get(56L), EPSILON); - assertEquals(0.300816, steadyStateProbabilities.get(78L), EPSILON); + assertEquals(0.163365, steadyStateProbabilities.get(12L), + RandomWalkTestUtils.EPSILON); + assertEquals(0.378932, steadyStateProbabilities.get(34L), + RandomWalkTestUtils.EPSILON); + assertEquals(0.156886, steadyStateProbabilities.get(56L), + RandomWalkTestUtils.EPSILON); + assertEquals(0.300816, steadyStateProbabilities.get(78L), + RandomWalkTestUtils.EPSILON); } - /** - * Parse steady state probabilities. - * @param results The steady state probabilities in text format. - * @return A map representation of the steady state probabilities. - */ - private Map<Long, Double> parseSteadyStateProbabilities( - Iterable<String> results) { - Map<Long, Double> result = Maps.newHashMap(); - for (String s : results) { - String[] tokens = s.split("\\t"); - Long id = Long.parseLong(tokens[0]); - Double value = Double.parseDouble(tokens[1]); - result.put(id, value); - } - return result; - } + }
