Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java Thu Feb 16 22:12:31 2012 @@ -18,7 +18,8 @@ package org.apache.giraph.examples; -import org.apache.giraph.graph.*; +import org.apache.giraph.graph.EdgeListVertex; +import org.apache.giraph.graph.WorkerContext; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; @@ -36,153 +37,174 @@ import java.util.Iterator; * appropriate location and superstep. */ public class VerifyMessage { - public static class VerifiableMessage implements Writable { - /** Superstep sent on */ - public long superstep; - /** Source vertex id */ - public long sourceVertexId; - /** Value */ - public float value; - - public VerifiableMessage() {} - - public VerifiableMessage( - long superstep, long sourceVertexId, float value) { - this.superstep = superstep; - this.sourceVertexId = sourceVertexId; - this.value = value; - } + /** + * Message that will be sent in {@link VerifyMessageVertex}. + */ + public static class VerifiableMessage implements Writable { + /** Superstep sent on */ + private long superstep; + /** Source vertex id */ + private long sourceVertexId; + /** Value */ + private float value; + + /** + * Default constructor used with reflection. + */ + public VerifiableMessage() { } + + /** + * Constructor with verifiable arguments. + * @param superstep Superstep this message was created on. + * @param sourceVertexId Who send this message. + * @param value A value associated with this message. + */ + public VerifiableMessage( + long superstep, long sourceVertexId, float value) { + this.superstep = superstep; + this.sourceVertexId = sourceVertexId; + this.value = value; + } - @Override - public void readFields(DataInput input) throws IOException { - superstep = input.readLong(); - sourceVertexId = input.readLong(); - value = input.readFloat(); - } + @Override + public void readFields(DataInput input) throws IOException { + superstep = input.readLong(); + sourceVertexId = input.readLong(); + value = input.readFloat(); + } - @Override - public void write(DataOutput output) throws IOException { - output.writeLong(superstep); - output.writeLong(sourceVertexId); - output.writeFloat(value); - } + @Override + public void write(DataOutput output) throws IOException { + output.writeLong(superstep); + output.writeLong(sourceVertexId); + output.writeFloat(value); + } - @Override - public String toString() { - return "(superstep=" + superstep + ",sourceVertexId=" + - sourceVertexId + ",value=" + value + ")"; - } + @Override + public String toString() { + return "(superstep=" + superstep + ",sourceVertexId=" + + sourceVertexId + ",value=" + value + ")"; } + } - public static class VerifyMessageVertex extends - EdgeListVertex<LongWritable, IntWritable, FloatWritable, - VerifiableMessage> { - /** User can access this after the application finishes if local */ - public static long finalSum; - /** Number of supersteps to run (6 by default) */ - private static int supersteps = 6; - /** Class logger */ - private static Logger LOG = Logger.getLogger(VerifyMessageVertex.class); - - /** Dynamically set number of supersteps */ - public static final String SUPERSTEP_COUNT = - "verifyMessageVertex.superstepCount"; - - public static class VerifyMessageVertexWorkerContext extends - WorkerContext { - @Override - public void preApplication() throws InstantiationException, - IllegalAccessException { - registerAggregator(LongSumAggregator.class.getName(), - LongSumAggregator.class); - LongSumAggregator sumAggregator = (LongSumAggregator) - getAggregator(LongSumAggregator.class.getName()); - sumAggregator.setAggregatedValue(new LongWritable(0)); - supersteps = getContext().getConfiguration().getInt( - SUPERSTEP_COUNT, supersteps); - } - - @Override - public void postApplication() { - LongSumAggregator sumAggregator = (LongSumAggregator) - getAggregator(LongSumAggregator.class.getName()); - finalSum = sumAggregator.getAggregatedValue().get(); - } - - @Override - public void preSuperstep() { - useAggregator(LongSumAggregator.class.getName()); - } + /** + * Send and verify messages. + */ + public static class VerifyMessageVertex extends + EdgeListVertex<LongWritable, IntWritable, FloatWritable, + VerifiableMessage> { + /** Dynamically set number of SUPERSTEPS */ + public static final String SUPERSTEP_COUNT = + "verifyMessageVertex.superstepCount"; + /** User can access this after the application finishes if local */ + private static long FINAL_SUM; + /** Number of SUPERSTEPS to run (6 by default) */ + private static int SUPERSTEPS = 6; + /** Class logger */ + private static Logger LOG = Logger.getLogger(VerifyMessageVertex.class); - @Override - public void postSuperstep() {} - } + public static long getFinalSum() { + return FINAL_SUM; + } - @Override - public void compute(Iterator<VerifiableMessage> msgIterator) { - LongSumAggregator sumAggregator = (LongSumAggregator) - getAggregator(LongSumAggregator.class.getName()); - if (getSuperstep() > supersteps) { - voteToHalt(); - return; - } - if (LOG.isDebugEnabled()) { - LOG.debug("compute: " + sumAggregator); - } - sumAggregator.aggregate(getVertexId().get()); - if (LOG.isDebugEnabled()) { - LOG.debug("compute: sum = " + - sumAggregator.getAggregatedValue().get() + - " for vertex " + getVertexId()); - } - float msgValue = 0.0f; - while (msgIterator.hasNext()) { - VerifiableMessage msg = msgIterator.next(); - msgValue += msg.value; - if (LOG.isDebugEnabled()) { - LOG.debug("compute: got msg = " + msg + - " for vertex id " + getVertexId() + - ", vertex value " + getVertexValue() + - " on superstep " + getSuperstep()); - } - if (msg.superstep != getSuperstep() - 1) { - throw new IllegalStateException( - "compute: Impossible to not get a messsage from " + - "the previous superstep, current superstep = " + - getSuperstep()); - } - if ((msg.sourceVertexId != getVertexId().get() - 1) && - (getVertexId().get() != 0)) { - throw new IllegalStateException( - "compute: Impossible that this message didn't come " + - "from the previous vertex and came from " + - msg.sourceVertexId); - } - } - int vertexValue = getVertexValue().get(); - setVertexValue(new IntWritable(vertexValue + (int) msgValue)); - if (LOG.isDebugEnabled()) { - LOG.debug("compute: vertex " + getVertexId() + - " has value " + getVertexValue() + - " on superstep " + getSuperstep()); - } - for (LongWritable targetVertexId : this) { - FloatWritable edgeValue = getEdgeValue(targetVertexId); - if (LOG.isDebugEnabled()) { - LOG.debug("compute: vertex " + getVertexId() + - " sending edgeValue " + edgeValue + - " vertexValue " + vertexValue + - " total " + - (edgeValue.get() + (float) vertexValue) + - " to vertex " + targetVertexId + - " on superstep " + getSuperstep()); - } - edgeValue.set(edgeValue.get() + (float) vertexValue); - addEdge(targetVertexId, edgeValue); - sendMsg(targetVertexId, - new VerifiableMessage( - getSuperstep(), getVertexId().get(), edgeValue.get())); - } - } + /** + * Worker context used with {@link VerifyMessageVertex}. + */ + public static class VerifyMessageVertexWorkerContext extends + WorkerContext { + @Override + public void preApplication() throws InstantiationException, + IllegalAccessException { + registerAggregator(LongSumAggregator.class.getName(), + LongSumAggregator.class); + LongSumAggregator sumAggregator = (LongSumAggregator) + getAggregator(LongSumAggregator.class.getName()); + sumAggregator.setAggregatedValue(new LongWritable(0)); + SUPERSTEPS = getContext().getConfiguration().getInt( + SUPERSTEP_COUNT, SUPERSTEPS); + } + + @Override + public void postApplication() { + LongSumAggregator sumAggregator = (LongSumAggregator) + getAggregator(LongSumAggregator.class.getName()); + FINAL_SUM = sumAggregator.getAggregatedValue().get(); + } + + @Override + public void preSuperstep() { + useAggregator(LongSumAggregator.class.getName()); + } + + @Override + public void postSuperstep() { } + } + + @Override + public void compute(Iterator<VerifiableMessage> msgIterator) { + LongSumAggregator sumAggregator = (LongSumAggregator) + getAggregator(LongSumAggregator.class.getName()); + if (getSuperstep() > SUPERSTEPS) { + voteToHalt(); + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("compute: " + sumAggregator); + } + sumAggregator.aggregate(getVertexId().get()); + if (LOG.isDebugEnabled()) { + LOG.debug("compute: sum = " + + sumAggregator.getAggregatedValue().get() + + " for vertex " + getVertexId()); + } + float msgValue = 0.0f; + while (msgIterator.hasNext()) { + VerifiableMessage msg = msgIterator.next(); + msgValue += msg.value; + if (LOG.isDebugEnabled()) { + LOG.debug("compute: got msg = " + msg + + " for vertex id " + getVertexId() + + ", vertex value " + getVertexValue() + + " on superstep " + getSuperstep()); + } + if (msg.superstep != getSuperstep() - 1) { + throw new IllegalStateException( + "compute: Impossible to not get a messsage from " + + "the previous superstep, current superstep = " + + getSuperstep()); + } + if ((msg.sourceVertexId != getVertexId().get() - 1) && + (getVertexId().get() != 0)) { + throw new IllegalStateException( + "compute: Impossible that this message didn't come " + + "from the previous vertex and came from " + + msg.sourceVertexId); + } + } + int vertexValue = getVertexValue().get(); + setVertexValue(new IntWritable(vertexValue + (int) msgValue)); + if (LOG.isDebugEnabled()) { + LOG.debug("compute: vertex " + getVertexId() + + " has value " + getVertexValue() + + " on superstep " + getSuperstep()); + } + for (LongWritable targetVertexId : this) { + FloatWritable edgeValue = getEdgeValue(targetVertexId); + if (LOG.isDebugEnabled()) { + LOG.debug("compute: vertex " + getVertexId() + + " sending edgeValue " + edgeValue + + " vertexValue " + vertexValue + + " total " + + (edgeValue.get() + (float) vertexValue) + + " to vertex " + targetVertexId + + " on superstep " + getSuperstep()); + } + edgeValue.set(edgeValue.get() + (float) vertexValue); + addEdge(targetVertexId, edgeValue); + sendMsg(targetVertexId, + new VerifiableMessage( + getSuperstep(), getVertexId().get(), edgeValue.get())); + } } + } }
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java Thu Feb 16 22:12:31 2012 @@ -1,20 +1,20 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.giraph.examples; @@ -33,39 +33,44 @@ import java.io.IOException; * Text-based {@link org.apache.giraph.graph.VertexOutputFormat} for usage with * {@link ConnectedComponentsVertex} * - * Each line consists of a vertex and its associated component (represented by the smallest - * vertex id in the component) + * Each line consists of a vertex and its associated component (represented + * by the smallest vertex id in the component) */ public class VertexWithComponentTextOutputFormat extends - TextVertexOutputFormat<IntWritable, IntWritable, NullWritable> { - - @Override - public VertexWriter<IntWritable, IntWritable, NullWritable> - createVertexWriter(TaskAttemptContext context) - throws IOException, InterruptedException { - RecordWriter<Text, Text> recordWriter = - textOutputFormat.getRecordWriter(context); - return new VertexWithComponentWriter(recordWriter); + TextVertexOutputFormat<IntWritable, IntWritable, NullWritable> { + @Override + public VertexWriter<IntWritable, IntWritable, NullWritable> + createVertexWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + RecordWriter<Text, Text> recordWriter = + textOutputFormat.getRecordWriter(context); + return new VertexWithComponentWriter(recordWriter); + } + + /** + * Vertex writer used with {@link VertexWithComponentTextOutputFormat}. + */ + public static class VertexWithComponentWriter extends + TextVertexOutputFormat.TextVertexWriter<IntWritable, IntWritable, + NullWritable> { + /** + * Constructor with record writer. + * + * @param writer Where the vertices will finally be written. + */ + public VertexWithComponentWriter(RecordWriter<Text, Text> writer) { + super(writer); } - public static class VertexWithComponentWriter extends - TextVertexOutputFormat.TextVertexWriter<IntWritable, IntWritable, - NullWritable> { - - public VertexWithComponentWriter(RecordWriter<Text, Text> writer) { - super(writer); - } - - @Override - public void writeVertex(BasicVertex<IntWritable, IntWritable, - NullWritable,?> vertex) throws IOException, - InterruptedException { - StringBuilder output = new StringBuilder(); - output.append(vertex.getVertexId().get()); - output.append('\t'); - output.append(vertex.getVertexValue().get()); - getRecordWriter().write(new Text(output.toString()), null); - } - + @Override + public void writeVertex(BasicVertex<IntWritable, IntWritable, + NullWritable, ?> vertex) throws IOException, + InterruptedException { + StringBuilder output = new StringBuilder(); + output.append(vertex.getVertexId().get()); + output.append('\t'); + output.append(vertex.getVertexValue().get()); + getRecordWriter().write(new Text(output.toString()), null); } -} \ No newline at end of file + } +} Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java) URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/package-info.java Thu Feb 16 22:12:31 2012 @@ -15,15 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package org.apache.giraph.bsp; - /** - * State of the BSP application + * Package of Giraph examples. */ -public enum ApplicationState { - UNKNOWN, ///< Shouldn't be seen, just an initial state - START_SUPERSTEP, ///< Start from a desired superstep - FAILED, ///< Unrecoverable - FINISHED ///< Successful completion -} +package org.apache.giraph.examples; Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Aggregator.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Aggregator.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Aggregator.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Aggregator.java Thu Feb 16 22:12:31 2012 @@ -24,39 +24,39 @@ import org.apache.hadoop.io.Writable; * Interface for Aggregator. Allows aggregate operations for all vertices * in a given superstep. * - * @param <A extends Writable> Aggregated value + * @param <A> Aggregated value */ public interface Aggregator<A extends Writable> { - /** - * Add a new value. - * Needs to be commutative and associative - * - * @param value - */ - void aggregate(A value); + /** + * Add a new value. + * Needs to be commutative and associative + * + * @param value Value to be aggregated. + */ + void aggregate(A value); - /** - * Set aggregated value. - * Can be used for initialization or reset. - * - * @param value - */ - void setAggregatedValue(A value); + /** + * Set aggregated value. + * Can be used for initialization or reset. + * + * @param value Value to be set. + */ + void setAggregatedValue(A value); - /** - * Return current aggregated value. - * Needs to be initialized if aggregate or setAggregatedValue - * have not been called before. - * - * @return A - */ - A getAggregatedValue(); + /** + * Return current aggregated value. + * Needs to be initialized if aggregate or setAggregatedValue + * have not been called before. + * + * @return Aggregated + */ + A getAggregatedValue(); - /** - * Return new aggregated value. - * Must be changeable without affecting internals of Aggregator - * - * @return Writable - */ - A createAggregatedValue(); + /** + * Return new aggregated value. + * Must be changeable without affecting internals of Aggregator + * + * @return Writable + */ + A createAggregatedValue(); } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorUsage.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorUsage.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorUsage.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorUsage.java Thu Feb 16 22:12:31 2012 @@ -24,34 +24,35 @@ import org.apache.hadoop.io.Writable; * Vertex classes can use this interface to register and use aggregators */ public interface AggregatorUsage { - /** - * Register an aggregator in preSuperstep() and/or preApplication(). - * - * @param name of aggregator - * @param aggregatorClass Class type of the aggregator - * @return created Aggregator or null when already registered - */ - public <A extends Writable> Aggregator<A> registerAggregator( - String name, - Class<? extends Aggregator<A>> aggregatorClass) - throws InstantiationException, IllegalAccessException; + /** + * Register an aggregator in preSuperstep() and/or preApplication(). + * + * @param <A> Aggregator type + * @param name of aggregator + * @param aggregatorClass Class type of the aggregator + * @return created Aggregator or null when already registered + */ + <A extends Writable> Aggregator<A> registerAggregator( + String name, + Class<? extends Aggregator<A>> aggregatorClass) + throws InstantiationException, IllegalAccessException; - /** - * Get a registered aggregator. - * - * @param name Name of aggregator - * @return Aggregator<A> (null when not registered) - */ - public Aggregator<? extends Writable> getAggregator(String name); + /** + * Get a registered aggregator. + * + * @param name Name of aggregator + * @return Aggregator (null when not registered) + */ + Aggregator<? extends Writable> getAggregator(String name); - /** - * Use a registered aggregator in current superstep. - * Even when the same aggregator should be used in the next - * superstep, useAggregator needs to be called at the beginning - * of that superstep in preSuperstep(). - * - * @param name Name of aggregator - * @return boolean (false when not registered) - */ - public boolean useAggregator(String name); + /** + * Use a registered aggregator in current superstep. + * Even when the same aggregator should be used in the next + * superstep, useAggregator needs to be called at the beginning + * of that superstep in preSuperstep(). + * + * @param name Name of aggregator + * @return boolean (false when not registered) + */ + boolean useAggregator(String name); } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java Thu Feb 16 22:12:31 2012 @@ -25,49 +25,49 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Mapper.Context; /** - * An AggregatorWriter is used to export Aggregators during or at the end of + * An AggregatorWriter is used to export Aggregators during or at the end of * each computation. It runs on the master and it's called at the end of each - * superstep. The special signal {@link AggregatorWriter#LAST_SUPERSTEP} is - * passed to {@link AggregatorWriter#writeAggregator(Map, long)} as the + * superstep. The special signal {@link AggregatorWriter#LAST_SUPERSTEP} is + * passed to {@link AggregatorWriter#writeAggregator(Map, long)} as the * superstep value to signal the end of computation. */ public interface AggregatorWriter { - /** Signal for last superstep */ - public static final int LAST_SUPERSTEP = -1; + /** Signal for last superstep */ + int LAST_SUPERSTEP = -1; - /** - * The method is called at the initialization of the AggregatorWriter. - * More precisely, the aggregatorWriter is initialized each time a new - * master is elected. - * - * @param context Mapper Context where the master is running on - * @param applicationAttempt ID of the applicationAttempt, used to - * disambiguate aggregator writes for different attempts - * @throws IOException - */ - @SuppressWarnings("rawtypes") - void initialize(Context context, long applicationAttempt) throws IOException; + /** + * The method is called at the initialization of the AggregatorWriter. + * More precisely, the aggregatorWriter is initialized each time a new + * master is elected. + * + * @param context Mapper Context where the master is running on + * @param applicationAttempt ID of the applicationAttempt, used to + * disambiguate aggregator writes for different attempts + * @throws IOException + */ + @SuppressWarnings("rawtypes") + void initialize(Context context, long applicationAttempt) throws IOException; - /** - * The method is called at the end of each superstep. The user might decide - * whether to write the aggregators values for the current superstep. For - * the last superstep, {@link AggregatorWriter#LAST_SUPERSTEP} is passed. - * - * @param aggregatorMap Map of aggregators to write - * @param superstep Current superstep - * @throws IOException - */ - void writeAggregator( - Map<String, Aggregator<Writable>> aggregatorMap, - long superstep) throws IOException; + /** + * The method is called at the end of each superstep. The user might decide + * whether to write the aggregators values for the current superstep. For + * the last superstep, {@link AggregatorWriter#LAST_SUPERSTEP} is passed. + * + * @param aggregatorMap Map of aggregators to write + * @param superstep Current superstep + * @throws IOException + */ + void writeAggregator( + Map<String, Aggregator<Writable>> aggregatorMap, + long superstep) throws IOException; - /** - * The method is called at the end of a successful computation. The method - * is not called when the job fails and a new master is elected. For this - * reason it's advised to flush data at the end of - * {@link AggregatorWriter#writeAggregator(Map, long)}. - * - * @throws IOException - */ - void close() throws IOException; + /** + * The method is called at the end of a successful computation. The method + * is not called when the job fails and a new master is elected. For this + * reason it's advised to flush data at the end of + * {@link AggregatorWriter#writeAggregator(Map, long)}. + * + * @throws IOException + */ + void close() throws IOException; } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java Thu Feb 16 22:12:31 2012 @@ -1,4 +1,4 @@ - /* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -28,249 +28,257 @@ import java.io.IOException; import java.util.Iterator; import java.util.Map; - /** +/** * Basic interface for writing a BSP application for computation. * - * @param <I> vertex id - * @param <V> vertex data - * @param <E> edge data - * @param <M> message data + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + * @param <M> Message data */ @SuppressWarnings("rawtypes") public abstract class BasicVertex<I extends WritableComparable, - V extends Writable, E extends Writable, M extends Writable> - implements AggregatorUsage, Iterable<I>, Writable, Configurable { - /** Global graph state **/ - private GraphState<I,V,E,M> graphState; - /** Configuration */ - private Configuration conf; - /** If true, do not do anymore computation on this vertex. */ - boolean halt = false; - - /** - * This method must be called after instantiation of a vertex with BspUtils - * unless deserialization from readFields() is called. - * - * @param vertexId Will be the vertex id - * @param vertexValue Will be the vertex value - * @param edges A map of destination edge ids to edge values (can be null) - * @param messages Initial messages for this vertex (can be null) - */ - public abstract void initialize( - I vertexId, V vertexValue, Map<I, E> edges, Iterable<M> messages); - - /** - * Must be defined by user to do computation on a single Vertex. - * - * @param msgIterator Iterator to the messages that were sent to this - * vertex in the previous superstep - * @throws IOException - */ - public abstract void compute(Iterator<M> msgIterator) throws IOException; - - /** - * Retrieves the current superstep. - * - * @return Current superstep - */ - public long getSuperstep() { - return getGraphState().getSuperstep(); - } - - /** - * Get the vertex id - */ - public abstract I getVertexId(); - - /** - * Get the vertex value (data stored with vertex) - * - * @return Vertex value - */ - public abstract V getVertexValue(); - - /** - * Set the vertex data (immediately visible in the computation) - * - * @param vertexValue Vertex data to be set - */ - public abstract void setVertexValue(V vertexValue); - - /** - * Get the total (all workers) number of vertices that - * existed in the previous superstep. - * - * @return Total number of vertices (-1 if first superstep) - */ - public long getNumVertices() { - return getGraphState().getNumVertices(); - } - - /** - * Get the total (all workers) number of edges that - * existed in the previous superstep. - * - * @return Total number of edges (-1 if first superstep) - */ - public long getNumEdges() { - return getGraphState().getNumEdges(); - } - - /** - * Get a read-only view of the out-edges of this vertex. - * - * @return the out edges (sort order determined by subclass implementation). - */ - @Override - public abstract Iterator<I> iterator(); - - /** - * Get the edge value associated with a target vertex id. - * - * @param targetVertexId Target vertex id to check - * - * @return the value of the edge to targetVertexId (or null if there - * is no edge to it) - */ - public abstract E getEdgeValue(I targetVertexId); - - /** - * Does an edge with the target vertex id exist? - * - * @param targetVertexId Target vertex id to check - * @return true if there is an edge to the target - */ - public abstract boolean hasEdge(I targetVertexId); - - /** - * Get the number of outgoing edges on this vertex. - * - * @return the total number of outbound edges from this vertex - */ - public abstract int getNumOutEdges(); - - /** - * Send a message to a vertex id. The message should not be mutated after - * this method returns or else undefined results could occur. - * - * @param id Vertex id to send the message to - * @param msg Message data to send. Note that after the message is sent, - * the user should not modify the object. - */ - public void sendMsg(I id, M msg) { - if (msg == null) { - throw new IllegalArgumentException( - "sendMsg: Cannot send null message to " + id); - } - getGraphState().getWorkerCommunications(). - sendMessageReq(id, msg); - } - - /** - * Send a message to all edges. - */ - public abstract void sendMsgToAllEdges(M msg); - - /** - * After this is called, the compute() code will no longer be called for - * this vertex unless a message is sent to it. Then the compute() code - * will be called once again until this function is called. The - * application finishes only when all vertices vote to halt. - */ - public void voteToHalt() { - halt = true; - } - - /** - * Is this vertex done? - */ - public boolean isHalted() { - return halt; - } - - /** - * Get the list of incoming messages from the previous superstep. Same as - * the message iterator passed to compute(). - */ - public abstract Iterable<M> getMessages(); - - /** - * Copy the messages this vertex should process in the current superstep - * - * @param messages the messages sent to this vertex in the previous superstep - */ - abstract void putMessages(Iterable<M> messages); - - /** - * Release unnecessary resources (will be called after vertex returns from - * {@link #compute()}) - */ - abstract void releaseResources(); - - /** - * Get the graph state for all workers. - * - * @return Graph state for all workers - */ - GraphState<I, V, E, M> getGraphState() { - return graphState; - } - - /** - * Set the graph state for all workers - * - * @param graphState Graph state for all workers - */ - void setGraphState(GraphState<I, V, E, M> graphState) { - this.graphState = graphState; - } - - /** - * Get the mapper context - * - * @return Mapper context - */ - public Mapper.Context getContext() { - return getGraphState().getContext(); - } - - /** - * Get the worker context - * - * @return WorkerContext context - */ - public WorkerContext getWorkerContext() { - return getGraphState().getGraphMapper().getWorkerContext(); - } - - @Override - public final <A extends Writable> Aggregator<A> registerAggregator( - String name, - Class<? extends Aggregator<A>> aggregatorClass) - throws InstantiationException, IllegalAccessException { - return getGraphState().getGraphMapper().getAggregatorUsage(). - registerAggregator(name, aggregatorClass); - } - - @Override - public final Aggregator<? extends Writable> getAggregator(String name) { - return getGraphState().getGraphMapper().getAggregatorUsage(). - getAggregator(name); - } - - @Override - public final boolean useAggregator(String name) { - return getGraphState().getGraphMapper().getAggregatorUsage(). - useAggregator(name); - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } + V extends Writable, E extends Writable, M extends Writable> + implements AggregatorUsage, Iterable<I>, Writable, Configurable { + /** If true, do not do anymore computation on this vertex. */ + protected boolean halt = false; + /** Global graph state **/ + private GraphState<I, V, E, M> graphState; + /** Configuration */ + private Configuration conf; + + + /** + * This method must be called after instantiation of a vertex with BspUtils + * unless deserialization from readFields() is called. + * + * @param vertexId Will be the vertex id + * @param vertexValue Will be the vertex value + * @param edges A map of destination edge ids to edge values (can be null) + * @param messages Initial messages for this vertex (can be null) + */ + public abstract void initialize( + I vertexId, V vertexValue, Map<I, E> edges, Iterable<M> messages); + + /** + * Must be defined by user to do computation on a single Vertex. + * + * @param msgIterator Iterator to the messages that were sent to this + * vertex in the previous superstep + * @throws IOException + */ + public abstract void compute(Iterator<M> msgIterator) throws IOException; + + /** + * Retrieves the current superstep. + * + * @return Current superstep + */ + public long getSuperstep() { + return getGraphState().getSuperstep(); + } + + /** + * Get the vertex id. + * + * @return My vertex id. + */ + public abstract I getVertexId(); + + /** + * Get the vertex value (data stored with vertex) + * + * @return Vertex value + */ + public abstract V getVertexValue(); + + /** + * Set the vertex data (immediately visible in the computation) + * + * @param vertexValue Vertex data to be set + */ + public abstract void setVertexValue(V vertexValue); + + /** + * Get the total (all workers) number of vertices that + * existed in the previous superstep. + * + * @return Total number of vertices (-1 if first superstep) + */ + public long getNumVertices() { + return getGraphState().getNumVertices(); + } + + /** + * Get the total (all workers) number of edges that + * existed in the previous superstep. + * + * @return Total number of edges (-1 if first superstep) + */ + public long getNumEdges() { + return getGraphState().getNumEdges(); + } + + /** + * Get a read-only view of the out-edges of this vertex. + * + * @return the out edges (sort order determined by subclass implementation). + */ + @Override + public abstract Iterator<I> iterator(); + + /** + * Get the edge value associated with a target vertex id. + * + * @param targetVertexId Target vertex id to check + * + * @return the value of the edge to targetVertexId (or null if there + * is no edge to it) + */ + public abstract E getEdgeValue(I targetVertexId); + + /** + * Does an edge with the target vertex id exist? + * + * @param targetVertexId Target vertex id to check + * @return true if there is an edge to the target + */ + public abstract boolean hasEdge(I targetVertexId); + + /** + * Get the number of outgoing edges on this vertex. + * + * @return the total number of outbound edges from this vertex + */ + public abstract int getNumOutEdges(); + + /** + * Send a message to a vertex id. The message should not be mutated after + * this method returns or else undefined results could occur. + * + * @param id Vertex id to send the message to + * @param msg Message data to send. Note that after the message is sent, + * the user should not modify the object. + */ + public void sendMsg(I id, M msg) { + if (msg == null) { + throw new IllegalArgumentException( + "sendMsg: Cannot send null message to " + id); + } + getGraphState().getWorkerCommunications(). + sendMessageReq(id, msg); + } + + /** + * Send a message to all edges. + * + * @param msg Message sent to all edges. + */ + public abstract void sendMsgToAllEdges(M msg); + + /** + * After this is called, the compute() code will no longer be called for + * this vertex unless a message is sent to it. Then the compute() code + * will be called once again until this function is called. The + * application finishes only when all vertices vote to halt. + */ + public void voteToHalt() { + halt = true; + } + + /** + * Is this vertex done? + * + * @return True if halted, false otherwise. + */ + public boolean isHalted() { + return halt; + } + + /** + * Get the list of incoming messages from the previous superstep. Same as + * the message iterator passed to compute(). + * + * @return Iterator of messages. + */ + public abstract Iterable<M> getMessages(); + + /** + * Copy the messages this vertex should process in the current superstep + * + * @param messages the messages sent to this vertex in the previous superstep + */ + abstract void putMessages(Iterable<M> messages); + + /** + * Release unnecessary resources (will be called after vertex returns from + * {@link #compute()}) + */ + abstract void releaseResources(); + + /** + * Get the graph state for all workers. + * + * @return Graph state for all workers + */ + GraphState<I, V, E, M> getGraphState() { + return graphState; + } + + /** + * Set the graph state for all workers + * + * @param graphState Graph state for all workers + */ + void setGraphState(GraphState<I, V, E, M> graphState) { + this.graphState = graphState; + } + + /** + * Get the mapper context + * + * @return Mapper context + */ + public Mapper.Context getContext() { + return getGraphState().getContext(); + } + + /** + * Get the worker context + * + * @return WorkerContext context + */ + public WorkerContext getWorkerContext() { + return getGraphState().getGraphMapper().getWorkerContext(); + } + + @Override + public final <A extends Writable> Aggregator<A> registerAggregator( + String name, Class<? extends Aggregator<A>> aggregatorClass) + throws InstantiationException, IllegalAccessException { + return getGraphState().getGraphMapper().getAggregatorUsage(). + registerAggregator(name, aggregatorClass); + } + + @Override + public final Aggregator<? extends Writable> getAggregator(String name) { + return getGraphState().getGraphMapper().getAggregatorUsage(). + getAggregator(name); + } + + @Override + public final boolean useAggregator(String name) { + return getGraphState().getGraphMapper().getAggregatorUsage(). + useAggregator(name); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java Thu Feb 16 22:12:31 2012 @@ -24,36 +24,38 @@ import org.apache.hadoop.io.WritableComp /** * Handles all the situations that can arise upon creation/removal of * vertices and edges. + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + * @param <M> Message data */ @SuppressWarnings("rawtypes") -public interface BasicVertexResolver< - I extends WritableComparable, - V extends Writable, - E extends Writable, - M extends Writable> { - /** - * A vertex may have been removed, created zero or more times and had - * zero or more messages sent to it. This method will handle all situations - * excluding the normal case (a vertex already exists and has zero or more - * messages sent it to). - * - * @param vertexId Vertex id (can be used for {@link BasicVertex}'s - * initialize()) - * @param vertex Original vertex or null if none - * @param vertexChanges Changes that happened to this vertex or null if none - * @param messages messages received in the last superstep or null if none - * @return Vertex to be returned, if null, and a vertex currently exists - * it will be removed - */ - BasicVertex<I, V, E, M> resolve(I vertexId, - BasicVertex<I, V, E, M> vertex, - VertexChanges<I, V, E, M> vertexChanges, - Iterable<M> messages); +public interface BasicVertexResolver<I extends WritableComparable, + V extends Writable, E extends Writable, M extends Writable> { + /** + * A vertex may have been removed, created zero or more times and had + * zero or more messages sent to it. This method will handle all situations + * excluding the normal case (a vertex already exists and has zero or more + * messages sent it to). + * + * @param vertexId Vertex id (can be used for {@link BasicVertex}'s + * initialize()) + * @param vertex Original vertex or null if none + * @param vertexChanges Changes that happened to this vertex or null if none + * @param messages messages received in the last superstep or null if none + * @return Vertex to be returned, if null, and a vertex currently exists + * it will be removed + */ + BasicVertex<I, V, E, M> resolve(I vertexId, + BasicVertex<I, V, E, M> vertex, + VertexChanges<I, V, E, M> vertexChanges, + Iterable<M> messages); - /** - * Create a default vertex that can be used to return from resolve(). - * - * @return Newly instantiated vertex. - */ - BasicVertex<I, V, E, M> instantiateVertex(); + /** + * Create a default vertex that can be used to return from resolve(). + * + * @return Newly instantiated vertex. + */ + BasicVertex<I, V, E, M> instantiateVertex(); }
