Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessageIterable.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessageIterable.java?rev=1449666&view=auto ============================================================================== --- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessageIterable.java (added) +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessageIterable.java Mon Feb 25 11:40:13 2013 @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.io.Writable; +import org.apache.hama.bsp.BSPPeer; + +import com.google.common.collect.AbstractIterator; + +/** + * The rationale behind this class is that it polls messages if they are + * requested and once it finds a message that is not dedicated for this vertex, + * it breaks the iteration. The message that was polled and doesn't belong to + * the vertex is returned by {@link #getOverflowMessage()}. + */ +public final class VertexMessageIterable<V, T> implements Iterable<T> { + + private final V vertexID; + private final BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer; + + private GraphJobMessage overflow; + private GraphJobMessage currentMessage; + + private Iterator<T> currentIterator; + + public VertexMessageIterable(GraphJobMessage currentMessage, V vertexID, + BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) { + this.currentMessage = currentMessage; + this.vertexID = vertexID; + this.peer = peer; + setupIterator(); + } + + private void setupIterator() { + currentIterator = new AbstractIterator<T>() { + @SuppressWarnings("unchecked") + @Override + protected T computeNext() { + // spool back the current message + if (currentMessage != null) { + GraphJobMessage tmp = currentMessage; + // set it to null, so we don't send it over and over again + currentMessage = null; + return (T) tmp.getVertexValue(); + } + + try { + GraphJobMessage msg = peer.getCurrentMessage(); + if (msg != null) { + if (msg.getVertexId().equals(vertexID)) { + return (T) msg.getVertexValue(); + } else { + overflow = msg; + } + } + return endOfData(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + } + + public GraphJobMessage getOverflowMessage() { + // check if iterable was completely consumed + while (currentIterator.hasNext()) { + currentIterator.next(); + } + return overflow; + } + + @Override + public Iterator<T> iterator() { + return currentIterator; + } + +}
Copied: hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java (from r1449611, hama/trunk/graph/src/main/java/org/apache/hama/graph/IVerticesInfo.java) URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java?p2=hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java&p1=hama/trunk/graph/src/main/java/org/apache/hama/graph/IVerticesInfo.java&r1=1449611&r2=1449666&rev=1449666&view=diff ============================================================================== --- hama/trunk/graph/src/main/java/org/apache/hama/graph/IVerticesInfo.java (original) +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java Mon Feb 25 11:40:13 2013 @@ -17,12 +17,12 @@ */ package org.apache.hama.graph; -import java.io.DataInput; -import java.io.DataOutput; -import java.util.Iterator; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hama.bsp.TaskAttemptID; /** * VerticesInfo interface encapsulates the storage of vertices in a BSP Task. @@ -31,13 +31,52 @@ import org.apache.hadoop.io.WritableComp * @param <E> Edge cost object type * @param <M> Vertex value object type */ -public interface IVerticesInfo<V extends WritableComparable<V>, E extends Writable, M extends Writable> - extends Iterable<Vertex<V, E, M>> { +public interface VerticesInfo<V extends WritableComparable<? super V>, E extends Writable, M extends Writable> { + + /** + * Initialization of internal structures. + */ + public void init(GraphJobRunner<V, E, M> runner, Configuration conf, + TaskAttemptID attempt) throws IOException; + + /** + * Cleanup of internal structures. + */ + public void cleanup(Configuration conf, TaskAttemptID attempt) + throws IOException; /** * Add a vertex to the underlying structure. */ - public void addVertex(Vertex<V, E, M> vertex); + public void addVertex(Vertex<V, E, M> vertex) throws IOException; + + /** + * Finish the additions, from this point on the implementations should close + * the adds and throw exceptions in case something is added after this call. + */ + public void finishAdditions(); + + /** + * Called once a superstep starts. + */ + public void startSuperstep() throws IOException; + + /** + * Called once completed a superstep. + */ + public void finishSuperstep() throws IOException; + + /** + * Must be called once a vertex is guaranteed not to change any more and can + * safely be persisted to a secondary storage. + */ + public void finishVertexComputation(Vertex<V, E, M> vertex) + throws IOException; + + /** + * @return true of all vertices are added. + */ + public boolean isFinishedAdditions(); /** * @return the number of vertices added to the underlying structure. @@ -45,12 +84,6 @@ public interface IVerticesInfo<V extends */ public int size(); - @Override - public Iterator<Vertex<V, E, M>> iterator(); - - // to be added and documented soon - public void recoverState(DataInput in); - - public void saveState(DataOutput out); + public IDSkippingIterator<V, E, M> skippingIterator(); } Added: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAbsDiffAggregator.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAbsDiffAggregator.java?rev=1449666&view=auto ============================================================================== --- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAbsDiffAggregator.java (added) +++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAbsDiffAggregator.java Mon Feb 25 11:40:13 2013 @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import junit.framework.TestCase; + +import org.apache.hadoop.io.DoubleWritable; +import org.junit.Test; + +public class TestAbsDiffAggregator extends TestCase { + + @Test + public void testAggregator() { + AbsDiffAggregator diff = new AbsDiffAggregator(); + diff.aggregate(null, new DoubleWritable(5), new DoubleWritable(2)); + diff.aggregate(null, new DoubleWritable(5), new DoubleWritable(2)); + diff.aggregate(null, null, new DoubleWritable(5)); + + // 0, because this is totally worthless for diffs + assertEquals(0, diff.getTimesAggregated().get()); + assertEquals(6, (int) diff.getValue().get()); + + } + +} Added: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAverageAggregator.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAverageAggregator.java?rev=1449666&view=auto ============================================================================== --- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAverageAggregator.java (added) +++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAverageAggregator.java Mon Feb 25 11:40:13 2013 @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import junit.framework.TestCase; + +import org.apache.hadoop.io.DoubleWritable; +import org.junit.Test; + +public class TestAverageAggregator extends TestCase { + + @Test + public void testAggregator() { + AverageAggregator diff = new AverageAggregator(); + diff.aggregate(null, new DoubleWritable(5), new DoubleWritable(2)); + diff.aggregateInternal(); + diff.aggregate(null, new DoubleWritable(5), new DoubleWritable(2)); + diff.aggregateInternal(); + diff.aggregate(null, null, new DoubleWritable(5)); + diff.aggregateInternal(); + + assertEquals(3, diff.getTimesAggregated().get()); + DoubleWritable x = diff.finalizeAggregation(); + assertEquals(2, (int) x.get()); + + } + +} Added: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java?rev=1449666&view=auto ============================================================================== --- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java (added) +++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java Mon Feb 25 11:40:13 2013 @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import java.util.ArrayList; +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hama.bsp.TaskAttemptID; +import org.apache.hama.graph.example.PageRank; +import org.apache.hama.graph.example.PageRank.PageRankVertex; +import org.junit.Test; + +public class TestDiskVerticesInfo extends TestCase { + + @Test + public void testDiskVerticesInfoLifeCycle() throws Exception { + DiskVerticesInfo<Text, NullWritable, DoubleWritable> info = new DiskVerticesInfo<Text, NullWritable, DoubleWritable>(); + Configuration conf = new Configuration(); + conf.set(GraphJob.VERTEX_CLASS_ATTR, PageRankVertex.class.getName()); + conf.set(GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR, + NullWritable.class.getName()); + conf.set(GraphJob.VERTEX_ID_CLASS_ATTR, Text.class.getName()); + conf.set(GraphJob.VERTEX_VALUE_CLASS_ATTR, DoubleWritable.class.getName()); + GraphJobRunner.<Text, NullWritable, DoubleWritable> initClasses(conf); + TaskAttemptID attempt = new TaskAttemptID("omg", 1, 1, 0); + try { + ArrayList<PageRankVertex> list = new ArrayList<PageRankVertex>(); + + for (int i = 0; i < 10; i++) { + PageRankVertex v = new PageRank.PageRankVertex(); + v.setVertexID(new Text(i + "")); + if (i % 2 == 0) { + v.setValue(new DoubleWritable(i * 2)); + } + v.addEdge(new Edge<Text, NullWritable>(new Text((10 - i) + ""), null)); + + list.add(v); + } + + info.init(null, conf, attempt); + for (PageRankVertex v : list) { + info.addVertex(v); + } + + info.finishAdditions(); + + assertEquals(10, info.size()); + // no we want to iterate and check if the result can properly be obtained + + int index = 0; + IDSkippingIterator<Text, NullWritable, DoubleWritable> iterator = info + .skippingIterator(); + while (iterator.hasNext()) { + Vertex<Text, NullWritable, DoubleWritable> next = iterator.next(); + PageRankVertex pageRankVertex = list.get(index); + assertEquals(pageRankVertex.getVertexID().toString(), next + .getVertexID().toString()); + if (index % 2 == 0) { + assertEquals((int) next.getValue().get(), index * 2); + } else { + assertNull(next.getValue()); + } + assertEquals(next.isHalted(), false); + // check edges + List<Edge<Text, NullWritable>> edges = next.getEdges(); + assertEquals(1, edges.size()); + Edge<Text, NullWritable> edge = edges.get(0); + assertEquals(pageRankVertex.getEdges().get(0).getDestinationVertexID() + .toString(), edge.getDestinationVertexID().toString()); + assertNull(edge.getValue()); + + index++; + } + assertEquals(index, list.size()); + info.finishSuperstep(); + // iterate again and compute so vertices change internally + iterator = info.skippingIterator(); + info.startSuperstep(); + while (iterator.hasNext()) { + Vertex<Text, NullWritable, DoubleWritable> next = iterator.next(); + // override everything with constant 2 + next.setValue(new DoubleWritable(2)); + if (Integer.parseInt(next.getVertexID().toString()) == 3) { + next.voteToHalt(); + } + info.finishVertexComputation(next); + } + info.finishSuperstep(); + + index = 0; + // now reread + info.startSuperstep(); + iterator = info.skippingIterator(); + while (iterator.hasNext()) { + Vertex<Text, NullWritable, DoubleWritable> next = iterator.next(); + PageRankVertex pageRankVertex = list.get(index); + assertEquals(pageRankVertex.getVertexID().toString(), next + .getVertexID().toString()); + assertEquals((int) next.getValue().get(), 2); + // check edges + List<Edge<Text, NullWritable>> edges = next.getEdges(); + assertEquals(1, edges.size()); + Edge<Text, NullWritable> edge = edges.get(0); + assertEquals(pageRankVertex.getEdges().get(0).getDestinationVertexID() + .toString(), edge.getDestinationVertexID().toString()); + assertNull(edge.getValue()); + if (index == 3) { + assertEquals(true, next.isHalted()); + } + + index++; + } + assertEquals(index, list.size()); + + } finally { + info.cleanup(conf, attempt); + } + + } +} Added: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java?rev=1449666&view=auto ============================================================================== --- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java (added) +++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java Mon Feb 25 11:40:13 2013 @@ -0,0 +1,50 @@ +package org.apache.hama.graph; + +import java.util.List; +import java.util.PriorityQueue; + +import junit.framework.TestCase; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class TestGraphJobMessage extends TestCase { + + @Test + public void testPriorityQueue() { + PriorityQueue<GraphJobMessage> prio = new PriorityQueue<GraphJobMessage>(); + prio.addAll(getMessages()); + + GraphJobMessage poll = prio.poll(); + assertEquals(true, poll.isMapMessage()); + poll = prio.poll(); + assertEquals(true, poll.isVertexMessage()); + assertEquals("1", poll.getVertexId().toString()); + + poll = prio.poll(); + assertEquals(true, poll.isVertexMessage()); + assertEquals("2", poll.getVertexId().toString()); + + poll = prio.poll(); + assertEquals(true, poll.isVertexMessage()); + assertEquals("3", poll.getVertexId().toString()); + + assertTrue(prio.isEmpty()); + } + + public List<GraphJobMessage> getMessages() { + GraphJobMessage mapMsg = new GraphJobMessage(new MapWritable()); + GraphJobMessage vertexMsg1 = new GraphJobMessage(new Text("1"), + new IntWritable()); + GraphJobMessage vertexMsg2 = new GraphJobMessage(new Text("2"), + new IntWritable()); + GraphJobMessage vertexMsg3 = new GraphJobMessage(new Text("3"), + new IntWritable()); + return Lists.newArrayList(mapMsg, vertexMsg1, vertexMsg2, vertexMsg3); + } + +} Added: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestMinMaxAggregator.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestMinMaxAggregator.java?rev=1449666&view=auto ============================================================================== --- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestMinMaxAggregator.java (added) +++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestMinMaxAggregator.java Mon Feb 25 11:40:13 2013 @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import junit.framework.TestCase; + +import org.apache.hadoop.io.IntWritable; +import org.junit.Test; + +public class TestMinMaxAggregator extends TestCase { + + @Test + public void testMinAggregator() { + MinAggregator diff = new MinAggregator(); + diff.aggregate(null, new IntWritable(5)); + diff.aggregate(null, new IntWritable(25)); + assertEquals(5, diff.getValue().get()); + + } + + @Test + public void testMaxAggregator() { + MaxAggregator diff = new MaxAggregator(); + diff.aggregate(null, new IntWritable(5)); + diff.aggregate(null, new IntWritable(25)); + assertEquals(25, diff.getValue().get()); + } + +} Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1449666&r1=1449665&r2=1449666&view=diff ============================================================================== --- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original) +++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Mon Feb 25 11:40:13 2013 @@ -44,12 +44,11 @@ public class TestSubmitGraphJob extends "yahoo.com\tnasa.gov\tstackoverflow.com", "twitter.com\tgoogle.com\tfacebook.com", "nasa.gov\tyahoo.com\tstackoverflow.com", - "youtube.com\tgoogle.com\tyahoo.com" }; + "youtube.com\tgoogle.com\tyahoo.com", "google.com" }; private static String INPUT = "/tmp/pagerank/real-tmp.seq"; private static String OUTPUT = "/tmp/pagerank/real-out"; - @SuppressWarnings("unchecked") @Override public void testSubmitJob() throws Exception { @@ -60,6 +59,7 @@ public class TestSubmitGraphJob extends bsp.setOutputPath(new Path(OUTPUT)); BSPJobClient jobClient = new BSPJobClient(configuration); configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000); + configuration.set("hama.graph.self.ref", "true"); ClusterStatus cluster = jobClient.getClusterStatus(false); assertEquals(this.numOfGroom, cluster.getGroomServers()); LOG.info("Client finishes execution job."); @@ -67,11 +67,8 @@ public class TestSubmitGraphJob extends bsp.setVertexClass(PageRank.PageRankVertex.class); // set the defaults bsp.setMaxIteration(30); - // FIXME why is the sum correct when 1-ALPHA instead of ALPHA itself? - bsp.set("hama.pagerank.alpha", "0.25"); - bsp.setAggregatorClass(AverageAggregator.class, - PageRank.DanglingNodeAggregator.class); + bsp.setAggregatorClass(AverageAggregator.class); bsp.setInputFormat(SequenceFileInputFormat.class); bsp.setInputKeyClass(Text.class); Added: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSumAggregator.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSumAggregator.java?rev=1449666&view=auto ============================================================================== --- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSumAggregator.java (added) +++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSumAggregator.java Mon Feb 25 11:40:13 2013 @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import junit.framework.TestCase; + +import org.apache.hadoop.io.DoubleWritable; +import org.junit.Test; + +public class TestSumAggregator extends TestCase { + + @Test + public void testAggregator() { + SumAggregator diff = new SumAggregator(); + diff.aggregate(null, new DoubleWritable(5)); + diff.aggregate(null, new DoubleWritable(5)); + assertEquals(10, (int) diff.getValue().get()); + + } + +} Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java?rev=1449666&r1=1449665&r2=1449666&view=diff ============================================================================== --- hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java (original) +++ hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java Mon Feb 25 11:40:13 2013 @@ -18,7 +18,6 @@ package org.apache.hama.graph.example; import java.io.IOException; -import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -31,7 +30,6 @@ import org.apache.hama.bsp.HashPartition import org.apache.hama.bsp.SequenceFileInputFormat; import org.apache.hama.bsp.TextArrayWritable; import org.apache.hama.bsp.TextOutputFormat; -import org.apache.hama.graph.AbstractAggregator; import org.apache.hama.graph.AverageAggregator; import org.apache.hama.graph.Edge; import org.apache.hama.graph.GraphJob; @@ -49,8 +47,6 @@ public class PageRank { static double DAMPING_FACTOR = 0.85; static double MAXIMUM_CONVERGENCE_ERROR = 0.001; - int numEdges; - @Override public void setup(Configuration conf) { String val = conf.get("hama.pagerank.alpha"); @@ -61,30 +57,20 @@ public class PageRank { if (val != null) { MAXIMUM_CONVERGENCE_ERROR = Double.parseDouble(val); } - numEdges = this.getEdges().size(); } @Override - public void compute(Iterator<DoubleWritable> messages) throws IOException { + public void compute(Iterable<DoubleWritable> messages) throws IOException { // initialize this vertex to 1 / count of global vertices in this graph if (this.getSuperstepCount() == 0) { this.setValue(new DoubleWritable(1.0 / this.getNumVertices())); } else if (this.getSuperstepCount() >= 1) { - DoubleWritable danglingNodeContribution = getLastAggregatedValue(1); double sum = 0; - while (messages.hasNext()) { - DoubleWritable msg = messages.next(); + for (DoubleWritable msg : messages) { sum += msg.get(); } - if (danglingNodeContribution == null) { - double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices(); - this.setValue(new DoubleWritable(alpha + (DAMPING_FACTOR * sum))); - } else { - double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices(); - this.setValue(new DoubleWritable(alpha - + (DAMPING_FACTOR * (sum + danglingNodeContribution.get() - / this.getNumVertices())))); - } + double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices(); + this.setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR))); } // if we have not reached our global error yet, then proceed. @@ -94,34 +80,10 @@ public class PageRank { voteToHalt(); return; } + // in each superstep we are going to send a new rank to our neighbours sendMessageToNeighbors(new DoubleWritable(this.getValue().get() - / numEdges)); - } - - } - - public static class DanglingNodeAggregator - extends - AbstractAggregator<DoubleWritable, Vertex<Text, NullWritable, DoubleWritable>> { - - double danglingNodeSum; - - @Override - public void aggregate(Vertex<Text, NullWritable, DoubleWritable> vertex, - DoubleWritable value) { - if (vertex != null) { - if (vertex.getEdges().size() == 0) { - danglingNodeSum += value.get(); - } - } else { - danglingNodeSum += value.get(); - } - } - - @Override - public DoubleWritable getValue() { - return new DoubleWritable(danglingNodeSum); + / this.getEdges().size())); } } @@ -142,7 +104,6 @@ public class PageRank { } } - @SuppressWarnings("unchecked") public static GraphJob createJob(String[] args, HamaConfiguration conf) throws IOException { GraphJob pageJob = new GraphJob(conf, PageRank.class); @@ -155,15 +116,17 @@ public class PageRank { // set the defaults pageJob.setMaxIteration(30); pageJob.set("hama.pagerank.alpha", "0.85"); + // reference vertices to itself, because we don't have a dangling node + // contribution here + pageJob.set("hama.graph.self.ref", "true"); pageJob.set("hama.graph.max.convergence.error", "0.001"); if (args.length == 3) { pageJob.setNumBspTask(Integer.parseInt(args[2])); } - // error, dangling node probability sum - pageJob.setAggregatorClass(AverageAggregator.class, - DanglingNodeAggregator.class); + // error + pageJob.setAggregatorClass(AverageAggregator.class); // Vertex reader pageJob.setVertexInputReaderClass(PagerankSeqReader.class);
