Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java Thu Feb 16 22:12:31 2012 @@ -31,424 +31,479 @@ import org.apache.hadoop.util.Reflection * instantiate them. */ public class BspUtils { - /** - * Get the user's subclassed {@link GraphPartitionerFactory}. - * - * @param conf Configuration to check - * @return User's graph partitioner - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - public static <I extends WritableComparable, V extends Writable, - E extends Writable, M extends Writable> - Class<? extends GraphPartitionerFactory<I, V, E, M>> - getGraphPartitionerClass(Configuration conf) { - return (Class<? extends GraphPartitionerFactory<I, V, E, M>>) - conf.getClass(GiraphJob.GRAPH_PARTITIONER_FACTORY_CLASS, - HashPartitionerFactory.class, - GraphPartitionerFactory.class); - } - - /** - * Create a user graph partitioner class - * - * @param conf Configuration to check - * @return Instantiated user graph partitioner class - */ - @SuppressWarnings("rawtypes") - public static <I extends WritableComparable, V extends Writable, - E extends Writable, M extends Writable> - GraphPartitionerFactory<I, V, E, M> - createGraphPartitioner(Configuration conf) { - Class<? extends GraphPartitionerFactory<I, V, E, M>> - graphPartitionerFactoryClass = - getGraphPartitionerClass(conf); - return ReflectionUtils.newInstance(graphPartitionerFactoryClass, conf); - } - - /** - * Create a user graph partitioner partition stats class - * - * @param conf Configuration to check - * @return Instantiated user graph partition stats class - */ - @SuppressWarnings("rawtypes") - public static <I extends WritableComparable, V extends Writable, - E extends Writable, M extends Writable> - PartitionStats createGraphPartitionStats(Configuration conf) { - GraphPartitionerFactory<I, V, E, M> graphPartitioner = - createGraphPartitioner(conf); - return graphPartitioner.createMasterGraphPartitioner(). - createPartitionStats(); - } - - /** - * Get the user's subclassed {@link VertexInputFormat}. - * - * @param conf Configuration to check - * @return User's vertex input format class - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - public static <I extends WritableComparable, - V extends Writable, - E extends Writable, - M extends Writable> - Class<? extends VertexInputFormat<I, V, E, M>> - getVertexInputFormatClass(Configuration conf) { - return (Class<? extends VertexInputFormat<I, V, E, M>>) - conf.getClass(GiraphJob.VERTEX_INPUT_FORMAT_CLASS, - null, - VertexInputFormat.class); - } - - /** - * Create a user vertex input format class - * - * @param conf Configuration to check - * @return Instantiated user vertex input format class - */ - @SuppressWarnings("rawtypes") - public static <I extends WritableComparable, - V extends Writable, - E extends Writable, - M extends Writable> VertexInputFormat<I, V, E, M> - createVertexInputFormat(Configuration conf) { - Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass = - getVertexInputFormatClass(conf); - VertexInputFormat<I, V, E, M> inputFormat = - ReflectionUtils.newInstance(vertexInputFormatClass, conf); - return inputFormat; - } - - /** - * Get the user's subclassed {@link VertexOutputFormat}. - * - * @param conf Configuration to check - * @return User's vertex output format class - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - public static <I extends WritableComparable, - V extends Writable, - E extends Writable> - Class<? extends VertexOutputFormat<I, V, E>> - getVertexOutputFormatClass(Configuration conf) { - return (Class<? extends VertexOutputFormat<I, V, E>>) - conf.getClass(GiraphJob.VERTEX_OUTPUT_FORMAT_CLASS, - null, - VertexOutputFormat.class); - } - - /** - * Create a user vertex output format class - * - * @param conf Configuration to check - * @return Instantiated user vertex output format class - */ - @SuppressWarnings("rawtypes") - public static <I extends WritableComparable, V extends Writable, - E extends Writable> VertexOutputFormat<I, V, E> - createVertexOutputFormat(Configuration conf) { - Class<? extends VertexOutputFormat<I, V, E>> vertexOutputFormatClass = - getVertexOutputFormatClass(conf); - return ReflectionUtils.newInstance(vertexOutputFormatClass, conf); - } - - /** - * Get the user's subclassed {@link AggregatorWriter}. - * - * @param conf Configuration to check - * @return User's aggregator writer class - */ - public static Class<? extends AggregatorWriter> - getAggregatorWriterClass(Configuration conf) { - return conf.getClass(GiraphJob.AGGREGATOR_WRITER_CLASS, - TextAggregatorWriter.class, - AggregatorWriter.class); - } - - /** - * Create a user aggregator output format class - * - * @param conf Configuration to check - * @return Instantiated user aggregator writer class - */ - public static AggregatorWriter - createAggregatorWriter(Configuration conf) { - Class<? extends AggregatorWriter> aggregatorWriterClass = - getAggregatorWriterClass(conf); - return ReflectionUtils.newInstance(aggregatorWriterClass, conf); - } - - /** - * Get the user's subclassed {@link VertexCombiner}. - * - * @param conf Configuration to check - * @return User's vertex combiner class - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - public static <I extends WritableComparable, - M extends Writable> - Class<? extends VertexCombiner<I, M>> - getVertexCombinerClass(Configuration conf) { - return (Class<? extends VertexCombiner<I, M>>) - conf.getClass(GiraphJob.VERTEX_COMBINER_CLASS, - null, - VertexCombiner.class); - } - - /** - * Create a user vertex combiner class - * - * @param conf Configuration to check - * @return Instantiated user vertex combiner class - */ - @SuppressWarnings("rawtypes") - public static <I extends WritableComparable, M extends Writable> - VertexCombiner<I, M> createVertexCombiner(Configuration conf) { - Class<? extends VertexCombiner<I, M>> vertexCombinerClass = - getVertexCombinerClass(conf); - return ReflectionUtils.newInstance(vertexCombinerClass, conf); - } - - /** - * Get the user's subclassed VertexResolver. - * - * @param conf Configuration to check - * @return User's vertex resolver class - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - public static <I extends WritableComparable, - V extends Writable, - E extends Writable, - M extends Writable> - Class<? extends VertexResolver<I, V, E, M>> - getVertexResolverClass(Configuration conf) { - return (Class<? extends VertexResolver<I, V, E, M>>) - conf.getClass(GiraphJob.VERTEX_RESOLVER_CLASS, - VertexResolver.class, - VertexResolver.class); - } - - /** - * Create a user vertex revolver - * - * @param conf Configuration to check - * @return Instantiated user vertex resolver - */ - @SuppressWarnings("rawtypes") - public static <I extends WritableComparable, V extends Writable, - E extends Writable, M extends Writable> VertexResolver<I, V, E, M> - createVertexResolver(Configuration conf, - GraphState<I, V, E, M> graphState) { - Class<? extends VertexResolver<I, V, E, M>> vertexResolverClass = - getVertexResolverClass(conf); - VertexResolver<I, V, E, M> resolver = - ReflectionUtils.newInstance(vertexResolverClass, conf); - resolver.setGraphState(graphState); - return resolver; - } - - /** - * Get the user's subclassed WorkerContext. - * - * @param conf Configuration to check - * @return User's worker context class - */ - public static Class<? extends WorkerContext> - getWorkerContextClass(Configuration conf) { - return (Class<? extends WorkerContext>) - conf.getClass(GiraphJob.WORKER_CONTEXT_CLASS, - DefaultWorkerContext.class, - WorkerContext.class); - } - - /** - * Create a user worker context - * - * @param conf Configuration to check - * @return Instantiated user worker context - */ - @SuppressWarnings("rawtypes") - public static <I extends WritableComparable, - V extends Writable, - E extends Writable, - M extends Writable> - WorkerContext createWorkerContext(Configuration conf, - GraphState<I, V, E, M> graphState) { - Class<? extends WorkerContext> workerContextClass = - getWorkerContextClass(conf); - WorkerContext workerContext = - ReflectionUtils.newInstance(workerContextClass, conf); - workerContext.setGraphState(graphState); - return workerContext; - } - - - /** - * Get the user's subclassed {@link BasicVertex} - * - * @param conf Configuration to check - * @return User's vertex class - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - public static <I extends WritableComparable, - V extends Writable, - E extends Writable, - M extends Writable> - Class<? extends BasicVertex<I, V, E, M>> - getVertexClass(Configuration conf) { - return (Class<? extends BasicVertex<I, V, E, M>>) - conf.getClass(GiraphJob.VERTEX_CLASS, - null, - BasicVertex.class); - } - - /** - * Create a user vertex - * - * @param conf Configuration to check - * @return Instantiated user vertex - */ - @SuppressWarnings("rawtypes") - public static <I extends WritableComparable, V extends Writable, - E extends Writable, M extends Writable> BasicVertex<I, V, E, M> - createVertex(Configuration conf) { - Class<? extends BasicVertex<I, V, E, M>> vertexClass = - getVertexClass(conf); - BasicVertex<I, V, E, M> vertex = - ReflectionUtils.newInstance(vertexClass, conf); - return vertex; - } - - /** - * Get the user's subclassed vertex index class. - * - * @param conf Configuration to check - * @return User's vertex index class - */ - @SuppressWarnings("unchecked") - public static <I extends Writable> Class<I> - getVertexIndexClass(Configuration conf) { - return (Class<I>) conf.getClass(GiraphJob.VERTEX_INDEX_CLASS, - WritableComparable.class); - } - - /** - * Create a user vertex index - * - * @param conf Configuration to check - * @return Instantiated user vertex index - */ - @SuppressWarnings("rawtypes") - public static <I extends WritableComparable> - I createVertexIndex(Configuration conf) { - Class<I> vertexClass = getVertexIndexClass(conf); - try { - return vertexClass.newInstance(); - } catch (InstantiationException e) { - throw new IllegalArgumentException( - "createVertexIndex: Failed to instantiate", e); - } catch (IllegalAccessException e) { - throw new IllegalArgumentException( - "createVertexIndex: Illegally accessed", e); - } - } - - /** - * Get the user's subclassed vertex value class. - * - * @param conf Configuration to check - * @return User's vertex value class - */ - @SuppressWarnings("unchecked") - public static <V extends Writable> Class<V> - getVertexValueClass(Configuration conf) { - return (Class<V>) conf.getClass(GiraphJob.VERTEX_VALUE_CLASS, - Writable.class); - } - - /** - * Create a user vertex value - * - * @param conf Configuration to check - * @return Instantiated user vertex value - */ - public static <V extends Writable> V - createVertexValue(Configuration conf) { - Class<V> vertexValueClass = getVertexValueClass(conf); - try { - return vertexValueClass.newInstance(); - } catch (InstantiationException e) { - throw new IllegalArgumentException( - "createVertexValue: Failed to instantiate", e); - } catch (IllegalAccessException e) { - throw new IllegalArgumentException( - "createVertexValue: Illegally accessed", e); - } - } - - /** - * Get the user's subclassed edge value class. - * - * @param conf Configuration to check - * @return User's vertex edge value class - */ - @SuppressWarnings("unchecked") - public static <E extends Writable> Class<E> - getEdgeValueClass(Configuration conf){ - return (Class<E>) conf.getClass(GiraphJob.EDGE_VALUE_CLASS, - Writable.class); - } - - /** - * Create a user edge value - * - * @param conf Configuration to check - * @return Instantiated user edge value - */ - public static <E extends Writable> E - createEdgeValue(Configuration conf) { - Class<E> edgeValueClass = getEdgeValueClass(conf); - try { - return edgeValueClass.newInstance(); - } catch (InstantiationException e) { - throw new IllegalArgumentException( - "createEdgeValue: Failed to instantiate", e); - } catch (IllegalAccessException e) { - throw new IllegalArgumentException( - "createEdgeValue: Illegally accessed", e); - } - } - - /** - * Get the user's subclassed vertex message value class. - * - * @param conf Configuration to check - * @return User's vertex message value class - */ - @SuppressWarnings("unchecked") - public static <M extends Writable> Class<M> - getMessageValueClass(Configuration conf) { - return (Class<M>) conf.getClass(GiraphJob.MESSAGE_VALUE_CLASS, - Writable.class); - } - - /** - * Create a user vertex message value - * - * @param conf Configuration to check - * @return Instantiated user vertex message value - */ - public static <M extends Writable> M - createMessageValue(Configuration conf) { - Class<M> messageValueClass = getMessageValueClass(conf); - try { - return messageValueClass.newInstance(); - } catch (InstantiationException e) { - throw new IllegalArgumentException( - "createMessageValue: Failed to instantiate", e); - } catch (IllegalAccessException e) { - throw new IllegalArgumentException( - "createMessageValue: Illegally accessed", e); - } + /** + * Do not construct. + */ + private BspUtils() { } + + /** + * Get the user's subclassed {@link GraphPartitionerFactory}. + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + * @param <M> Message data + * @param conf Configuration to check + * @return User's graph partitioner + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static <I extends WritableComparable, V extends Writable, + E extends Writable, M extends Writable> + Class<? extends GraphPartitionerFactory<I, V, E, M>> + getGraphPartitionerClass(Configuration conf) { + return (Class<? extends GraphPartitionerFactory<I, V, E, M>>) + conf.getClass(GiraphJob.GRAPH_PARTITIONER_FACTORY_CLASS, + HashPartitionerFactory.class, + GraphPartitionerFactory.class); + } + + /** + * Create a user graph partitioner class + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + * @param <M> Message data + * @param conf Configuration to check + * @return Instantiated user graph partitioner class + */ + @SuppressWarnings("rawtypes") + public static <I extends WritableComparable, V extends Writable, + E extends Writable, M extends Writable> + GraphPartitionerFactory<I, V, E, M> + createGraphPartitioner(Configuration conf) { + Class<? extends GraphPartitionerFactory<I, V, E, M>> + graphPartitionerFactoryClass = getGraphPartitionerClass(conf); + return ReflectionUtils.newInstance(graphPartitionerFactoryClass, conf); + } + + /** + * Create a user graph partitioner partition stats class + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + * @param <M> Message data + * @param conf Configuration to check + * @return Instantiated user graph partition stats class + */ + @SuppressWarnings("rawtypes") + public static <I extends WritableComparable, V extends Writable, + E extends Writable, M extends Writable> + PartitionStats createGraphPartitionStats(Configuration conf) { + GraphPartitionerFactory<I, V, E, M> graphPartitioner = + createGraphPartitioner(conf); + return graphPartitioner.createMasterGraphPartitioner(). + createPartitionStats(); + } + + /** + * Get the user's subclassed {@link VertexInputFormat}. + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + * @param <M> Message data + * @param conf Configuration to check + * @return User's vertex input format class + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static <I extends WritableComparable, + V extends Writable, + E extends Writable, + M extends Writable> + Class<? extends VertexInputFormat<I, V, E, M>> + getVertexInputFormatClass(Configuration conf) { + return (Class<? extends VertexInputFormat<I, V, E, M>>) + conf.getClass(GiraphJob.VERTEX_INPUT_FORMAT_CLASS, + null, + VertexInputFormat.class); + } + + /** + * Create a user vertex input format class + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + * @param <M> Message data + * @param conf Configuration to check + * @return Instantiated user vertex input format class + */ + @SuppressWarnings("rawtypes") + public static <I extends WritableComparable, + V extends Writable, + E extends Writable, + M extends Writable> VertexInputFormat<I, V, E, M> + createVertexInputFormat(Configuration conf) { + Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass = + getVertexInputFormatClass(conf); + VertexInputFormat<I, V, E, M> inputFormat = + ReflectionUtils.newInstance(vertexInputFormatClass, conf); + return inputFormat; + } + + /** + * Get the user's subclassed {@link VertexOutputFormat}. + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + * @param conf Configuration to check + * @return User's vertex output format class + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static <I extends WritableComparable, + V extends Writable, + E extends Writable> + Class<? extends VertexOutputFormat<I, V, E>> + getVertexOutputFormatClass(Configuration conf) { + return (Class<? extends VertexOutputFormat<I, V, E>>) + conf.getClass(GiraphJob.VERTEX_OUTPUT_FORMAT_CLASS, + null, + VertexOutputFormat.class); + } + + /** + * Create a user vertex output format class + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + * @param conf Configuration to check + * @return Instantiated user vertex output format class + */ + @SuppressWarnings("rawtypes") + public static <I extends WritableComparable, V extends Writable, + E extends Writable> VertexOutputFormat<I, V, E> + createVertexOutputFormat(Configuration conf) { + Class<? extends VertexOutputFormat<I, V, E>> vertexOutputFormatClass = + getVertexOutputFormatClass(conf); + return ReflectionUtils.newInstance(vertexOutputFormatClass, conf); + } + + /** + * Get the user's subclassed {@link AggregatorWriter}. + * + * @param conf Configuration to check + * @return User's aggregator writer class + */ + public static Class<? extends AggregatorWriter> + getAggregatorWriterClass(Configuration conf) { + return conf.getClass(GiraphJob.AGGREGATOR_WRITER_CLASS, + TextAggregatorWriter.class, + AggregatorWriter.class); + } + + /** + * Create a user aggregator output format class + * + * @param conf Configuration to check + * @return Instantiated user aggregator writer class + */ + public static AggregatorWriter createAggregatorWriter(Configuration conf) { + Class<? extends AggregatorWriter> aggregatorWriterClass = + getAggregatorWriterClass(conf); + return ReflectionUtils.newInstance(aggregatorWriterClass, conf); + } + + /** + * Get the user's subclassed {@link VertexCombiner}. + * + * @param <I> Vertex id + * @param <M> Message data + * @param conf Configuration to check + * @return User's vertex combiner class + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static <I extends WritableComparable, M extends Writable> + Class<? extends VertexCombiner<I, M>> + getVertexCombinerClass(Configuration conf) { + return (Class<? extends VertexCombiner<I, M>>) + conf.getClass(GiraphJob.VERTEX_COMBINER_CLASS, + null, + VertexCombiner.class); + } + + /** + * Create a user vertex combiner class + * + * @param <I> Vertex id + * @param <M> Message data + * @param conf Configuration to check + * @return Instantiated user vertex combiner class + */ + @SuppressWarnings("rawtypes") + public static <I extends WritableComparable, M extends Writable> + VertexCombiner<I, M> createVertexCombiner(Configuration conf) { + Class<? extends VertexCombiner<I, M>> vertexCombinerClass = + getVertexCombinerClass(conf); + return ReflectionUtils.newInstance(vertexCombinerClass, conf); + } + + /** + * Get the user's subclassed VertexResolver. + * + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + * @param <M> Message data + * @param conf Configuration to check + * @return User's vertex resolver class + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public static <I extends WritableComparable, V extends Writable, + E extends Writable, M extends Writable> + Class<? extends VertexResolver<I, V, E, M>> + getVertexResolverClass(Configuration conf) { + return (Class<? extends VertexResolver<I, V, E, M>>) + conf.getClass(GiraphJob.VERTEX_RESOLVER_CLASS, + VertexResolver.class, + VertexResolver.class); + } + + /** + * Create a user vertex revolver + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + * @param <M> Message data + * @param conf Configuration to check + * @param graphState State of the graph from the worker + * @return Instantiated user vertex resolver + */ + @SuppressWarnings("rawtypes") + public static <I extends WritableComparable, V extends Writable, + E extends Writable, M extends Writable> VertexResolver<I, V, E, M> + createVertexResolver(Configuration conf, + GraphState<I, V, E, M> graphState) { + Class<? extends VertexResolver<I, V, E, M>> vertexResolverClass = + getVertexResolverClass(conf); + VertexResolver<I, V, E, M> resolver = + ReflectionUtils.newInstance(vertexResolverClass, conf); + resolver.setGraphState(graphState); + return resolver; + } + + /** + * Get the user's subclassed WorkerContext. + * + * @param conf Configuration to check + * @return User's worker context class + */ + public static Class<? extends WorkerContext> + getWorkerContextClass(Configuration conf) { + return (Class<? extends WorkerContext>) + conf.getClass(GiraphJob.WORKER_CONTEXT_CLASS, + DefaultWorkerContext.class, + WorkerContext.class); + } + + /** + * Create a user worker context + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + * @param <M> Message data + * @param conf Configuration to check + * @param graphState State of the graph from the worker + * @return Instantiated user worker context + */ + @SuppressWarnings("rawtypes") + public static <I extends WritableComparable, V extends Writable, + E extends Writable, M extends Writable> + WorkerContext createWorkerContext(Configuration conf, + GraphState<I, V, E, M> graphState) { + Class<? extends WorkerContext> workerContextClass = + getWorkerContextClass(conf); + WorkerContext workerContext = + ReflectionUtils.newInstance(workerContextClass, conf); + workerContext.setGraphState(graphState); + return workerContext; + } + + + /** + * Get the user's subclassed {@link BasicVertex} + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + * @param <M> Message data + * @param conf Configuration to check + * @return User's vertex class + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static <I extends WritableComparable, V extends Writable, + E extends Writable, M extends Writable> + Class<? extends BasicVertex<I, V, E, M>> getVertexClass(Configuration conf) { + return (Class<? extends BasicVertex<I, V, E, M>>) + conf.getClass(GiraphJob.VERTEX_CLASS, + null, + BasicVertex.class); + } + + /** + * Create a user vertex + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + * @param <M> Message data + * @param conf Configuration to check + * @return Instantiated user vertex + */ + @SuppressWarnings("rawtypes") + public static <I extends WritableComparable, V extends Writable, + E extends Writable, M extends Writable> BasicVertex<I, V, E, M> + createVertex(Configuration conf) { + Class<? extends BasicVertex<I, V, E, M>> vertexClass = getVertexClass(conf); + BasicVertex<I, V, E, M> vertex = + ReflectionUtils.newInstance(vertexClass, conf); + return vertex; + } + + /** + * Get the user's subclassed vertex index class. + * + * @param <I> Vertex id + * @param conf Configuration to check + * @return User's vertex index class + */ + @SuppressWarnings("unchecked") + public static <I extends Writable> Class<I> + getVertexIndexClass(Configuration conf) { + return (Class<I>) conf.getClass(GiraphJob.VERTEX_INDEX_CLASS, + WritableComparable.class); + } + + /** + * Create a user vertex index + * + * @param <I> Vertex id + * @param conf Configuration to check + * @return Instantiated user vertex index + */ + @SuppressWarnings("rawtypes") + public static <I extends WritableComparable> + I createVertexIndex(Configuration conf) { + Class<I> vertexClass = getVertexIndexClass(conf); + try { + return vertexClass.newInstance(); + } catch (InstantiationException e) { + throw new IllegalArgumentException( + "createVertexIndex: Failed to instantiate", e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException( + "createVertexIndex: Illegally accessed", e); + } + } + + /** + * Get the user's subclassed vertex value class. + * + * @param <V> Vertex data + * @param conf Configuration to check + * @return User's vertex value class + */ + @SuppressWarnings("unchecked") + public static <V extends Writable> Class<V> + getVertexValueClass(Configuration conf) { + return (Class<V>) conf.getClass(GiraphJob.VERTEX_VALUE_CLASS, + Writable.class); + } + + /** + * Create a user vertex value + * + * @param <V> Vertex data + * @param conf Configuration to check + * @return Instantiated user vertex value + */ + public static <V extends Writable> V + createVertexValue(Configuration conf) { + Class<V> vertexValueClass = getVertexValueClass(conf); + try { + return vertexValueClass.newInstance(); + } catch (InstantiationException e) { + throw new IllegalArgumentException( + "createVertexValue: Failed to instantiate", e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException( + "createVertexValue: Illegally accessed", e); + } + } + + /** + * Get the user's subclassed edge value class. + * + * @param <E> Edge data + * @param conf Configuration to check + * @return User's vertex edge value class + */ + @SuppressWarnings("unchecked") + public static <E extends Writable> Class<E> + getEdgeValueClass(Configuration conf) { + return (Class<E>) conf.getClass(GiraphJob.EDGE_VALUE_CLASS, + Writable.class); + } + + /** + * Create a user edge value + * + * @param <E> Edge data + * @param conf Configuration to check + * @return Instantiated user edge value + */ + public static <E extends Writable> E + createEdgeValue(Configuration conf) { + Class<E> edgeValueClass = getEdgeValueClass(conf); + try { + return edgeValueClass.newInstance(); + } catch (InstantiationException e) { + throw new IllegalArgumentException( + "createEdgeValue: Failed to instantiate", e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException( + "createEdgeValue: Illegally accessed", e); + } + } + + /** + * Get the user's subclassed vertex message value class. + * + * @param <M> Message data + * @param conf Configuration to check + * @return User's vertex message value class + */ + @SuppressWarnings("unchecked") + public static <M extends Writable> Class<M> + getMessageValueClass(Configuration conf) { + return (Class<M>) conf.getClass(GiraphJob.MESSAGE_VALUE_CLASS, + Writable.class); + } + + /** + * Create a user vertex message value + * + * @param <M> Message data + * @param conf Configuration to check + * @return Instantiated user vertex message value + */ + public static <M extends Writable> M + createMessageValue(Configuration conf) { + Class<M> messageValueClass = getMessageValueClass(conf); + try { + return messageValueClass.newInstance(); + } catch (InstantiationException e) { + throw new IllegalArgumentException( + "createMessageValue: Failed to instantiate", e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException( + "createMessageValue: Illegally accessed", e); } + } }
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java Thu Feb 16 22:12:31 2012 @@ -25,17 +25,17 @@ package org.apache.giraph.graph; */ public class DefaultWorkerContext extends WorkerContext { - @Override - public void preApplication() throws InstantiationException, - IllegalAccessException { - } + @Override + public void preApplication() + throws InstantiationException, IllegalAccessException { + } - @Override - public void postApplication() { } + @Override + public void postApplication() { } - @Override - public void preSuperstep() { } + @Override + public void preSuperstep() { } - @Override - public void postSuperstep() { } -} \ No newline at end of file + @Override + public void postSuperstep() { } +} Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java Thu Feb 16 22:12:31 2012 @@ -36,133 +36,138 @@ import java.io.IOException; */ @SuppressWarnings("rawtypes") public class Edge<I extends WritableComparable, E extends Writable> - implements WritableComparable<Edge<I, E>>, Configurable { - /** Destination vertex id */ - private I destVertexId = null; - /** Edge value */ - private E edgeValue = null; - /** Configuration - Used to instantiate classes */ - private Configuration conf = null; - - /** - * Constructor for reflection - */ - public Edge() {} - - /** - * Create the edge with final values - * - * @param destVertexId - * @param edgeValue - */ - public Edge(I destVertexId, E edgeValue) { - this.destVertexId = destVertexId; - this.edgeValue = edgeValue; - } - - /** - * Get the destination vertex index of this edge - * - * @return Destination vertex index of this edge - */ - public I getDestVertexId() { - return destVertexId; - } - - /** - * Get the edge value of the edge - * - * @return Edge value of this edge - */ - public E getEdgeValue() { - return edgeValue; - } - - /** - * Set the destination vertex index of this edge. - * - * @param destVertexId new destination vertex - */ - public void setDestVertexId(I destVertexId) { - this.destVertexId = destVertexId; - } - - /** - * Set the value for this edge. - * - * @param edgeValue new edge value - */ - public void setEdgeValue(E edgeValue) { - this.edgeValue = edgeValue; - } - - @Override - public String toString() { - return "(DestVertexIndex = " + destVertexId + - ", edgeValue = " + edgeValue + ")"; - } - - @SuppressWarnings("unchecked") - @Override - public void readFields(DataInput input) throws IOException { - destVertexId = (I) BspUtils.createVertexIndex(getConf()); - destVertexId.readFields(input); - edgeValue = (E) BspUtils.createEdgeValue(getConf()); - edgeValue.readFields(input); - } - - @Override - public void write(DataOutput output) throws IOException { - if (destVertexId == null) { - throw new IllegalStateException( - "write: Null destination vertex index"); - } - if (edgeValue == null) { - throw new IllegalStateException( - "write: Null edge value"); - } - destVertexId.write(output); - edgeValue.write(output); - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - @SuppressWarnings("unchecked") - @Override - public int compareTo(Edge<I, E> edge) { - return destVertexId.compareTo(edge.getDestVertexId()); - } - - @Override - public boolean equals(Object o) { - if (this == o) { return true; } - if (o == null || getClass() != o.getClass()) { return false; } - - Edge edge = (Edge) o; - - if (destVertexId != null ? !destVertexId.equals(edge.destVertexId) : - edge.destVertexId != null) { - return false; - } - if (edgeValue != null ? !edgeValue.equals(edge.edgeValue) : edge.edgeValue != null) { - return false; - } - - return true; - } - - @Override - public int hashCode() { - int result = destVertexId != null ? destVertexId.hashCode() : 0; - result = 31 * result + (edgeValue != null ? edgeValue.hashCode() : 0); - return result; - } + implements WritableComparable<Edge<I, E>>, Configurable { + /** Destination vertex id */ + private I destVertexId = null; + /** Edge value */ + private E edgeValue = null; + /** Configuration - Used to instantiate classes */ + private Configuration conf = null; + + /** + * Constructor for reflection + */ + public Edge() { } + + /** + * Create the edge with final values + * + * @param destVertexId Desination vertex id. + * @param edgeValue Value of the edge. + */ + public Edge(I destVertexId, E edgeValue) { + this.destVertexId = destVertexId; + this.edgeValue = edgeValue; + } + + /** + * Get the destination vertex index of this edge + * + * @return Destination vertex index of this edge + */ + public I getDestVertexId() { + return destVertexId; + } + + /** + * Get the edge value of the edge + * + * @return Edge value of this edge + */ + public E getEdgeValue() { + return edgeValue; + } + + /** + * Set the destination vertex index of this edge. + * + * @param destVertexId new destination vertex + */ + public void setDestVertexId(I destVertexId) { + this.destVertexId = destVertexId; + } + + /** + * Set the value for this edge. + * + * @param edgeValue new edge value + */ + public void setEdgeValue(E edgeValue) { + this.edgeValue = edgeValue; + } + + @Override + public String toString() { + return "(DestVertexIndex = " + destVertexId + + ", edgeValue = " + edgeValue + ")"; + } + + @SuppressWarnings("unchecked") + @Override + public void readFields(DataInput input) throws IOException { + destVertexId = (I) BspUtils.createVertexIndex(getConf()); + destVertexId.readFields(input); + edgeValue = (E) BspUtils.createEdgeValue(getConf()); + edgeValue.readFields(input); + } + + @Override + public void write(DataOutput output) throws IOException { + if (destVertexId == null) { + throw new IllegalStateException( + "write: Null destination vertex index"); + } + if (edgeValue == null) { + throw new IllegalStateException( + "write: Null edge value"); + } + destVertexId.write(output); + edgeValue.write(output); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @SuppressWarnings("unchecked") + @Override + public int compareTo(Edge<I, E> edge) { + return destVertexId.compareTo(edge.getDestVertexId()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Edge edge = (Edge) o; + + if (destVertexId != null ? !destVertexId.equals(edge.destVertexId) : + edge.destVertexId != null) { + return false; + } + if (edgeValue != null ? + !edgeValue.equals(edge.edgeValue) : edge.edgeValue != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = destVertexId != null ? destVertexId.hashCode() : 0; + result = 31 * result + (edgeValue != null ? edgeValue.hashCode() : 0); + return result; + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java Thu Feb 16 22:12:31 2012 @@ -48,265 +48,266 @@ import java.util.Map; */ @SuppressWarnings("rawtypes") public abstract class EdgeListVertex<I extends WritableComparable, - V extends Writable, - E extends Writable, M extends Writable> - extends MutableVertex<I, V, E, M> { - /** Class logger */ - private static final Logger LOG = Logger.getLogger(EdgeListVertex.class); - /** Vertex id */ - private I vertexId = null; - /** Vertex value */ - private V vertexValue = null; - /** List of the dest edge indices */ - private List<I> destEdgeIndexList; - /** List of the dest edge values */ - /** Map of destination vertices and their edge values */ - private List<E> destEdgeValueList; - /** List of incoming messages from the previous superstep */ - private List<M> msgList; - - @Override - public void initialize(I vertexId, V vertexValue, - Map<I, E> edges, - Iterable<M> messages) { - if (vertexId != null) { - setVertexId(vertexId); - } - if (vertexValue != null) { - setVertexValue(vertexValue); - } - if (edges != null && !edges.isEmpty()) { - destEdgeIndexList = Lists.newArrayListWithCapacity(edges.size()); - destEdgeValueList = Lists.newArrayListWithCapacity(edges.size()); - List<I> sortedIndexList = new ArrayList<I>(edges.keySet()); - Collections.sort(sortedIndexList, new VertexIdComparator()); - for (I index : sortedIndexList) { - destEdgeIndexList.add(index); - destEdgeValueList.add(edges.get(index)); - } - sortedIndexList.clear(); - } else { - destEdgeIndexList = Lists.newArrayListWithCapacity(0); - destEdgeValueList = Lists.newArrayListWithCapacity(0); - } - if (messages != null) { - msgList = Lists.newArrayListWithCapacity(Iterables.size(messages)); - Iterables.<M>addAll(msgList, messages); - } else { - msgList = Lists.newArrayListWithCapacity(0); - } - } - - @Override - public boolean equals(Object other) { - if (other instanceof EdgeListVertex) { - @SuppressWarnings("unchecked") - EdgeListVertex<I, V, E, M> otherVertex = (EdgeListVertex) other; - if (!getVertexId().equals(otherVertex.getVertexId())) { - return false; - } - if (!getVertexValue().equals(otherVertex.getVertexValue())) { - return false; - } - if (!ComparisonUtils.equal(getMessages(), - otherVertex.getMessages())) { - return false; - } - return ComparisonUtils.equal(iterator(), otherVertex.iterator()); - } + V extends Writable, E extends Writable, M extends Writable> + extends MutableVertex<I, V, E, M> { + /** Class logger */ + private static final Logger LOG = Logger.getLogger(EdgeListVertex.class); + /** Vertex id */ + private I vertexId = null; + /** Vertex value */ + private V vertexValue = null; + /** List of the dest edge indices */ + private List<I> destEdgeIndexList; + /** List of the dest edge values */ + private List<E> destEdgeValueList; + /** List of incoming messages from the previous superstep */ + private List<M> msgList; + + @Override + public void initialize(I vertexId, V vertexValue, + Map<I, E> edges, + Iterable<M> messages) { + if (vertexId != null) { + setVertexId(vertexId); + } + if (vertexValue != null) { + setVertexValue(vertexValue); + } + if (edges != null && !edges.isEmpty()) { + destEdgeIndexList = Lists.newArrayListWithCapacity(edges.size()); + destEdgeValueList = Lists.newArrayListWithCapacity(edges.size()); + List<I> sortedIndexList = new ArrayList<I>(edges.keySet()); + Collections.sort(sortedIndexList, new VertexIdComparator()); + for (I index : sortedIndexList) { + destEdgeIndexList.add(index); + destEdgeValueList.add(edges.get(index)); + } + sortedIndexList.clear(); + } else { + destEdgeIndexList = Lists.newArrayListWithCapacity(0); + destEdgeValueList = Lists.newArrayListWithCapacity(0); + } + if (messages != null) { + msgList = Lists.newArrayListWithCapacity(Iterables.size(messages)); + Iterables.<M>addAll(msgList, messages); + } else { + msgList = Lists.newArrayListWithCapacity(0); + } + } + + @Override + public int hashCode() { + return vertexId.hashCode() * 37 + vertexValue.hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other instanceof EdgeListVertex) { + @SuppressWarnings("unchecked") + EdgeListVertex<I, V, E, M> otherVertex = (EdgeListVertex) other; + if (!getVertexId().equals(otherVertex.getVertexId())) { return false; + } + if (!getVertexValue().equals(otherVertex.getVertexValue())) { + return false; + } + if (!ComparisonUtils.equal(getMessages(), + otherVertex.getMessages())) { + return false; + } + return ComparisonUtils.equal(iterator(), otherVertex.iterator()); } + return false; + } - /** - * Comparator for the vertex id - */ - private class VertexIdComparator implements Comparator<I> { - @SuppressWarnings("unchecked") - @Override - public int compare(I index1, I index2) { - return index1.compareTo(index2); - } - } - - @Override - public final boolean addEdge(I targetVertexId, E edgeValue) { - System.out.println("addEdge: " + targetVertexId + " " + edgeValue + " " + destEdgeIndexList); - int pos = Collections.binarySearch(destEdgeIndexList, - targetVertexId, - new VertexIdComparator()); - if (pos < 0) { - destEdgeIndexList.add(-1 * (pos + 1), targetVertexId); - destEdgeValueList.add(-1 * (pos + 1), edgeValue); - return true; - } else { - LOG.warn("addEdge: Vertex=" + vertexId + - ": already added an edge value for dest vertex id " + - targetVertexId); - return false; - } - } - - @Override - public long getSuperstep() { - return getGraphState().getSuperstep(); - } - - @Override - public final void setVertexId(I vertexId) { - this.vertexId = vertexId; - } - - @Override - public final I getVertexId() { - return vertexId; - } - - @Override - public final V getVertexValue() { - return vertexValue; - } - - @Override - public final void setVertexValue(V vertexValue) { - this.vertexValue = vertexValue; - } - - @Override - public E getEdgeValue(I targetVertexId) { - int pos = Collections.binarySearch(destEdgeIndexList, - targetVertexId, - new VertexIdComparator()); - if (pos < 0) { - return null; - } else { - return destEdgeValueList.get(pos); - } - } - - @Override - public boolean hasEdge(I targetVertexId) { - int pos = Collections.binarySearch(destEdgeIndexList, - targetVertexId, - new VertexIdComparator()); - if (pos < 0) { - return false; - } else { - return true; - } - } - - /** - * Get an iterator to the edges on this vertex. - * - * @return A <em>sorted</em> iterator, as defined by the sort-order - * of the vertex ids - */ - @Override - public Iterator<I> iterator() { - return destEdgeIndexList.iterator(); - } - - @Override - public int getNumOutEdges() { - return destEdgeIndexList.size(); - } - - @Override - public E removeEdge(I targetVertexId) { - int pos = Collections.binarySearch(destEdgeIndexList, - targetVertexId, - new VertexIdComparator()); - if (pos < 0) { - return null; - } else { - destEdgeIndexList.remove(pos); - return destEdgeValueList.remove(pos); - } - } - - @Override - public final void sendMsgToAllEdges(M msg) { - if (msg == null) { - throw new IllegalArgumentException( - "sendMsgToAllEdges: Cannot send null message to all edges"); - } - for (I index : destEdgeIndexList) { - sendMsg(index, msg); - } - } - - @Override - final public void readFields(DataInput in) throws IOException { - vertexId = BspUtils.<I>createVertexIndex(getConf()); - vertexId.readFields(in); - boolean hasVertexValue = in.readBoolean(); - if (hasVertexValue) { - vertexValue = BspUtils.<V>createVertexValue(getConf()); - vertexValue.readFields(in); - } - int edgeListCount = in.readInt(); - destEdgeIndexList = Lists.newArrayListWithCapacity(edgeListCount); - destEdgeValueList = Lists.newArrayListWithCapacity(edgeListCount); - for (int i = 0; i < edgeListCount; ++i) { - I vertexId = BspUtils.<I>createVertexIndex(getConf()); - E edgeValue = BspUtils.<E>createEdgeValue(getConf()); - vertexId.readFields(in); - edgeValue.readFields(in); - destEdgeIndexList.add(vertexId); - destEdgeValueList.add(edgeValue); - } - int msgListSize = in.readInt(); - msgList = Lists.newArrayListWithCapacity(msgListSize); - for (int i = 0; i < msgListSize; ++i) { - M msg = BspUtils.<M>createMessageValue(getConf()); - msg.readFields(in); - msgList.add(msg); - } - halt = in.readBoolean(); - } - - @Override - final public void write(DataOutput out) throws IOException { - vertexId.write(out); - out.writeBoolean(vertexValue != null); - if (vertexValue != null) { - vertexValue.write(out); - } - out.writeInt(destEdgeIndexList.size()); - for (int i = 0 ; i < destEdgeIndexList.size(); ++i) { - destEdgeIndexList.get(i).write(out); - destEdgeValueList.get(i).write(out); - } - out.writeInt(msgList.size()); - for (M msg : msgList) { - msg.write(out); - } - out.writeBoolean(halt); - } - - @Override - void putMessages(Iterable<M> messages) { - msgList.clear(); - for (M message : messages) { - msgList.add(message); - } - } - - @Override - public Iterable<M> getMessages() { - return Iterables.unmodifiableIterable(msgList); - } - - @Override - void releaseResources() { - // Hint to GC to free the messages - msgList.clear(); - } - - @Override - public String toString() { - return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + - ",#edges=" + getNumOutEdges() + ")"; - } + /** + * Comparator for the vertex id + */ + private class VertexIdComparator implements Comparator<I> { + @SuppressWarnings("unchecked") + @Override + public int compare(I index1, I index2) { + return index1.compareTo(index2); + } + } + + @Override + public final boolean addEdge(I targetVertexId, E edgeValue) { + int pos = Collections.binarySearch(destEdgeIndexList, + targetVertexId, + new VertexIdComparator()); + if (pos < 0) { + destEdgeIndexList.add(-1 * (pos + 1), targetVertexId); + destEdgeValueList.add(-1 * (pos + 1), edgeValue); + return true; + } else { + LOG.warn("addEdge: Vertex=" + vertexId + + ": already added an edge value for dest vertex id " + + targetVertexId); + return false; + } + } + + @Override + public long getSuperstep() { + return getGraphState().getSuperstep(); + } + + @Override + public final void setVertexId(I vertexId) { + this.vertexId = vertexId; + } + + @Override + public final I getVertexId() { + return vertexId; + } + + @Override + public final V getVertexValue() { + return vertexValue; + } + + @Override + public final void setVertexValue(V vertexValue) { + this.vertexValue = vertexValue; + } + + @Override + public E getEdgeValue(I targetVertexId) { + int pos = Collections.binarySearch(destEdgeIndexList, + targetVertexId, + new VertexIdComparator()); + if (pos < 0) { + return null; + } else { + return destEdgeValueList.get(pos); + } + } + + @Override + public boolean hasEdge(I targetVertexId) { + int pos = Collections.binarySearch(destEdgeIndexList, + targetVertexId, + new VertexIdComparator()); + if (pos < 0) { + return false; + } + return true; + } + + /** + * Get an iterator to the edges on this vertex. + * + * @return A <em>sorted</em> iterator, as defined by the sort-order + * of the vertex ids + */ + @Override + public Iterator<I> iterator() { + return destEdgeIndexList.iterator(); + } + + @Override + public int getNumOutEdges() { + return destEdgeIndexList.size(); + } + + @Override + public E removeEdge(I targetVertexId) { + int pos = Collections.binarySearch(destEdgeIndexList, + targetVertexId, + new VertexIdComparator()); + if (pos < 0) { + return null; + } else { + destEdgeIndexList.remove(pos); + return destEdgeValueList.remove(pos); + } + } + + @Override + public final void sendMsgToAllEdges(M msg) { + if (msg == null) { + throw new IllegalArgumentException( + "sendMsgToAllEdges: Cannot send null message to all edges"); + } + for (I index : destEdgeIndexList) { + sendMsg(index, msg); + } + } + + @Override + public final void readFields(DataInput in) throws IOException { + vertexId = BspUtils.<I>createVertexIndex(getConf()); + vertexId.readFields(in); + boolean hasVertexValue = in.readBoolean(); + if (hasVertexValue) { + vertexValue = BspUtils.<V>createVertexValue(getConf()); + vertexValue.readFields(in); + } + int edgeListCount = in.readInt(); + destEdgeIndexList = Lists.newArrayListWithCapacity(edgeListCount); + destEdgeValueList = Lists.newArrayListWithCapacity(edgeListCount); + for (int i = 0; i < edgeListCount; ++i) { + I destVertexId = BspUtils.<I>createVertexIndex(getConf()); + E edgeValue = BspUtils.<E>createEdgeValue(getConf()); + destVertexId.readFields(in); + edgeValue.readFields(in); + destEdgeIndexList.add(destVertexId); + destEdgeValueList.add(edgeValue); + } + int msgListSize = in.readInt(); + msgList = Lists.newArrayListWithCapacity(msgListSize); + for (int i = 0; i < msgListSize; ++i) { + M msg = BspUtils.<M>createMessageValue(getConf()); + msg.readFields(in); + msgList.add(msg); + } + halt = in.readBoolean(); + } + + @Override + public final void write(DataOutput out) throws IOException { + vertexId.write(out); + out.writeBoolean(vertexValue != null); + if (vertexValue != null) { + vertexValue.write(out); + } + out.writeInt(destEdgeIndexList.size()); + for (int i = 0; i < destEdgeIndexList.size(); ++i) { + destEdgeIndexList.get(i).write(out); + destEdgeValueList.get(i).write(out); + } + out.writeInt(msgList.size()); + for (M msg : msgList) { + msg.write(out); + } + out.writeBoolean(halt); + } + + @Override + void putMessages(Iterable<M> messages) { + msgList.clear(); + for (M message : messages) { + msgList.add(message); + } + } + + @Override + public Iterable<M> getMessages() { + return Iterables.unmodifiableIterable(msgList); + } + + @Override + void releaseResources() { + // Hint to GC to free the messages + msgList.clear(); + } + + @Override + public String toString() { + return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + + ",#edges=" + getNumOutEdges() + ")"; + } }
