[FLINK-3207] [gelly] add pregel iteration abstraction to gelly

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c5ffb5dd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c5ffb5dd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c5ffb5dd

Branch: refs/heads/master
Commit: c5ffb5ddd06b62f639145f8698c71fc74ff8c9b6
Parents: 35892ed
Author: vasia <[email protected]>
Authored: Tue Feb 2 12:13:51 2016 +0100
Committer: vasia <[email protected]>
Committed: Mon Mar 21 19:10:29 2016 +0100

----------------------------------------------------------------------
 .../flink/graph/pregel/ComputeFunction.java     | 266 ++++++++++
 .../flink/graph/pregel/MessageCombiner.java     |  71 +++
 .../flink/graph/pregel/MessageIterator.java     |  77 +++
 .../pregel/VertexCentricConfiguration.java      |  64 +++
 .../graph/pregel/VertexCentricIteration.java    | 504 +++++++++++++++++++
 .../flink/graph/spargel/MessagingFunction.java  |   8 +-
 6 files changed, 986 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c5ffb5dd/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
new file mode 100644
index 0000000..09a000f
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.pregel;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.Either;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+
+/**
+ * The base class for the message-passing functions between vertices as a part 
of a {@link VertexCentricIteration}.
+ * 
+ * @param <K> The type of the vertex key (the vertex identifier).
+ * @param <VV> The type of the vertex value (the state of the vertex).
+ * @param <EV> The type of the values that are associated with the edges.
+ * @param <Message> The type of the message sent between vertices along the 
edges.
+ */
+public abstract class ComputeFunction<K, VV, EV, Message> implements 
Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Public API Methods
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * This method is invoked once per superstep, for each active vertex.
+        * A vertex is active during a superstep, if at least one message was 
produced for it,
+        * in the previous superstep. During the first superstep, all vertices 
are active.
+        * <p>
+        * This method can iterate over all received messages, set the new 
vertex value, and
+        * send messages to other vertices (which will be delivered in the next 
superstep).
+        * 
+        * @param vertex The vertex executing this function
+        * @param messages The messages that were sent to this vertex in the 
previous superstep
+        * @throws Exception
+        */
+       public abstract void compute(Vertex<K, VV> vertex, 
MessageIterator<Message> messages) throws Exception;
+
+       /**
+        * This method is executed once per superstep before the vertex update 
function is invoked for each vertex.
+        * 
+        * @throws Exception Exceptions in the pre-superstep phase cause the 
superstep to fail.
+        */
+       public void preSuperstep() throws Exception {}
+       
+       /**
+        * This method is executed once per superstep after the vertex update 
function has been invoked for each vertex.
+        * 
+        * @throws Exception Exceptions in the post-superstep phase cause the 
superstep to fail.
+        */
+       public void postSuperstep() throws Exception {}
+       
+       
+       /**
+        * Gets an {@link java.lang.Iterable} with all out-going edges. This 
method is mutually exclusive with
+        * {@link #sendMessageToAllNeighbors(Object)} and may be called only 
once.
+        * 
+        * @return An iterator with all edges.
+        */
+       public final Iterable<Edge<K, EV>> getEdges() {
+               if (edgesUsed) {
+                       throw new IllegalStateException("Can use either 
'getEdges()' or 'sendMessageToAllNeighbors()' exactly once.");
+               }
+               edgesUsed = true;
+               this.edgeIterator.set((Iterator<Edge<K, EV>>) edges);
+               return this.edgeIterator;
+       }
+
+       /**
+        * Sends the given message to all vertices that are targets of an edge 
of the changed vertex.
+        * This method is mutually exclusive to the method {@link #getEdges()} 
and may be called only once.
+        * 
+        * @param m The message to send.
+        */
+       public final void sendMessageToAllNeighbors(Message m) {
+               if (edgesUsed) {
+                       throw new IllegalStateException("Can use either 
'getEdges()' or 'sendMessageToAllNeighbors()'"
+                                       + "exactly once.");
+               }
+               
+               edgesUsed = true;
+
+               outMsg.setField(m, 1);
+               
+               while (edges.hasNext()) {
+                       Tuple next = (Tuple) edges.next();
+                       outMsg.setField(next.getField(1), 0);
+                       out.collect(Either.Right(outMsg));
+               }
+       }
+       
+       /**
+        * Sends the given message to the vertex identified by the given key. 
If the target vertex does not exist,
+        * the next superstep will cause an exception due to a non-deliverable 
message.
+        * 
+        * @param target The key (id) of the target vertex to message.
+        * @param m The message.
+        */
+       public final void sendMessageTo(K target, Message m) {
+
+               outMsg.setField(target, 0);
+               outMsg.setField(m, 1);
+
+               out.collect(Either.Right(outMsg));
+       }
+
+       /**
+        * Sets the new value of this vertex.
+        *
+        * This should be called at most once per ComputeFunction.
+        * 
+        * @param newValue The new vertex value.
+        */
+       public final void setNewVertexValue(VV newValue) {
+               if(setNewVertexValueCalled) {
+                       throw new IllegalStateException("setNewVertexValue 
should only be called at most once per updateVertex");
+               }
+               setNewVertexValueCalled = true;
+
+               outVertex.setField(newValue, 1);
+
+               out.collect(Either.Left(outVertex));
+       }
+       // 
--------------------------------------------------------------------------------------------
+       
+       /**
+        * Gets the number of the superstep, starting at <tt>1</tt>.
+        * 
+        * @return The number of the current superstep.
+        */
+       public final int getSuperstepNumber() {
+               return this.runtimeContext.getSuperstepNumber();
+       }
+       
+       /**
+        * Gets the iteration aggregator registered under the given name. The 
iteration aggregator combines
+        * all aggregates globally once per superstep and makes them available 
in the next superstep.
+        * 
+        * @param name The name of the aggregator.
+        * @return The aggregator registered under this name, or null, if no 
aggregator was registered.
+        */
+       public final <T extends Aggregator<?>> T getIterationAggregator(String 
name) {
+               return this.runtimeContext.<T>getIterationAggregator(name);
+       }
+       
+       /**
+        * Get the aggregated value that an aggregator computed in the previous 
iteration.
+        * 
+        * @param name The name of the aggregator.
+        * @return The aggregated value of the previous iteration.
+        */
+       public final <T extends Value> T getPreviousIterationAggregate(String 
name) {
+               return 
this.runtimeContext.<T>getPreviousIterationAggregate(name);
+       }
+       
+       /**
+        * Gets the broadcast data set registered under the given name. 
Broadcast data sets
+        * are available on all parallel instances of a function. They can be 
registered via
+        * {@link 
org.apache.flink.graph.spargel.VertexCentricConfiguration#addBroadcastSet(String,
 org.apache.flink.api.java.DataSet)}.
+        * 
+        * @param name The name under which the broadcast set is registered.
+        * @return The broadcast data set.
+        */
+       public final <T> Collection<T> getBroadcastSet(String name) {
+               return this.runtimeContext.<T>getBroadcastVariable(name);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  internal methods and state
+       // 
--------------------------------------------------------------------------------------------
+
+       private Vertex<K, VV> outVertex;
+
+       private Tuple2<K, Message> outMsg;
+       
+       private IterationRuntimeContext runtimeContext;
+       
+       private Iterator<Edge<K, EV>> edges;
+       
+       private Collector<Either<?, ?>> out;
+       
+       private EdgesIterator<K, EV> edgeIterator;
+       
+       private boolean edgesUsed;
+
+       private boolean setNewVertexValueCalled;
+       
+       void init(IterationRuntimeContext context) {
+               this.runtimeContext = context;
+               this.outVertex = new Vertex<K, VV>();
+               this.outMsg = new Tuple2<K, Message>();
+               this.edgeIterator = new EdgesIterator<K, EV>();
+       }
+       
+       @SuppressWarnings("unchecked")
+       void set(K vertexId, Iterator<Edge<K, EV>> edges,
+                       Collector<Either<Vertex<K, VV>, Tuple2<K, Message>>> 
out) {
+
+               this.outVertex.setField(vertexId, 0);
+               this.edges = edges;
+               this.out = (Collector<Either<?, ?>>) (Collector<?>) out;
+               this.edgesUsed = false;
+               setNewVertexValueCalled = false;
+       }
+       
+       private static final class EdgesIterator<K, EV> 
+               implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>>
+       {
+               private Iterator<Edge<K, EV>> input;
+               
+               private Edge<K, EV> edge = new Edge<K, EV>();
+               
+               void set(Iterator<Edge<K, EV>> input) {
+                       this.input = input;
+               }
+               
+               @Override
+               public boolean hasNext() {
+                       return input.hasNext();
+               }
+
+               @Override
+               public Edge<K, EV> next() {
+                       Edge<K, EV> next = input.next();
+                       edge.setSource(next.f0);
+                       edge.setTarget(next.f1);
+                       edge.setValue(next.f2);
+                       return edge;
+               }
+
+               @Override
+               public void remove() {
+                       throw new UnsupportedOperationException();
+               }
+               @Override
+               public Iterator<Edge<K, EV>> iterator() {
+                       return this;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c5ffb5dd/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
new file mode 100644
index 0000000..9458323
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.pregel;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.Either;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+/**
+ * The base class for combining messages sent during a {@link 
VertexCentricteration}.
+ * 
+ * @param <K> The type of the vertex id
+ * @param <Message> The type of the message sent between vertices along the 
edges.
+ */
+public abstract class MessageCombiner<K, Message> implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       private Collector<Tuple2<K, Either<NullValue, Message>>> out;
+
+       private K vertexId;
+
+       private Tuple2<K, Either<NullValue, Message>> outValue;
+
+       void set(K target, Collector<Tuple2<K, Either<NullValue, Message>>> 
collector) {
+               this.vertexId = target;
+               this.out = collector;
+               this.outValue = new Tuple2<K, Either<NullValue, Message>>();
+               outValue.setField(vertexId, 0);
+       }
+
+       /**
+        * Combines messages from sent different vertices to a target vertex.
+        * Implementing this method might reduce communication costs during a 
vertex-centric
+        * iteration.
+        * 
+        * @param messages the input messages to combine
+        * @throws Exception
+        */
+       public abstract void combineMessages(MessageIterator<Message> messages) 
throws Exception;
+
+       /**
+        * Sends the combined message to the target vertex.
+        * 
+        * @param combinedMessage
+        * @throws Exception
+        */
+       public final void sendCombinedMessage(Message combinedMessage) {
+               outValue.setField(Either.Right(combinedMessage), 1);
+               out.collect(outValue);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c5ffb5dd/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java
new file mode 100644
index 0000000..620cb93
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.pregel;
+
+import java.util.Iterator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.Either;
+import org.apache.flink.types.NullValue;
+
+/**
+ * An iterator that returns messages. The iterator is {@link 
java.lang.Iterable} at the same time to support
+ * the <i>foreach</i> syntax.
+ */
+public final class MessageIterator<Message> implements Iterator<Message>, 
Iterable<Message>, java.io.Serializable {
+       private static final long serialVersionUID = 1L;
+
+       private transient Iterator<Tuple2<?, Either<NullValue, Message>>> 
source;
+       private Message first = null;
+       
+       final void setSource(Iterator<Tuple2<?, Either<NullValue, Message>>> 
source) {
+               this.source = source;
+       }
+
+       final void setFirst(Message msg) {
+               this.first = msg;
+       }
+       
+       @Override
+       public final boolean hasNext() {
+               if (first != null) {
+                       return true;
+               }
+               else if (this.source != null) {
+                       return this.source.hasNext();   
+               }
+               else {
+                       return false;
+               }
+       }
+       
+       @Override
+       public final Message next() {
+               if (first != null) {
+                       Message toReturn = first;
+                       first = null;
+                       return toReturn;
+               }
+               return this.source.next().f1.right();
+       }
+
+       @Override
+       public final void remove() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Iterator<Message> iterator() {
+               return this;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/c5ffb5dd/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
new file mode 100644
index 0000000..287f20d
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.pregel;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.IterationConfiguration;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A VertexCentricConfiguration object can be used to set the iteration name 
and
+ * degree of parallelism, to register aggregators and use broadcast sets in
+ * the {@link org.apache.flink.graph.pregel.ComputeFunction}.
+ *
+ * The VertexCentricConfiguration object is passed as an argument to
+ * {@link org.apache.flink.graph.Graph#runVertexCentricIteration (
+ * org.apache.flink.graph.pregel.ComputeFunction, int, 
VertexCentricConfiguration)}.
+ */
+public class VertexCentricConfiguration extends IterationConfiguration {
+
+       /** the broadcast variables for the compute function **/
+       private List<Tuple2<String, DataSet<?>>> bcVars = new 
ArrayList<Tuple2<String,DataSet<?>>>();
+
+       public VertexCentricConfiguration() {}
+
+       /**
+        * Adds a data set as a broadcast set to the compute function.
+        *
+        * @param name The name under which the broadcast data is available in 
the compute function.
+        * @param data The data set to be broadcasted.
+        */
+       public void addBroadcastSet(String name, DataSet<?> data) {
+               this.bcVars.add(new Tuple2<String, DataSet<?>>(name, data));
+       }
+
+       /**
+        * Get the broadcast variables of the compute function.
+        *
+        * @return a List of Tuple2, where the first field is the broadcast 
variable name
+        * and the second field is the broadcast DataSet.
+        */
+       public List<Tuple2<String, DataSet<?>>> getBcastVars() {
+               return this.bcVars;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c5ffb5dd/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
new file mode 100644
index 0000000..c79ace7
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.pregel;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.operators.CoGroupOperator;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.operators.CustomUnaryOperation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.Either;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class represents iterative graph computations, programmed in a 
vertex-centric perspective.
+ * It is a special case of <i>Bulk Synchronous Parallel</i> computation. The 
paradigm has also been
+ * implemented by Google's <i>Pregel</i> system and by <i>Apache Giraph</i>.
+ * <p>
+ * Vertex centric algorithms operate on graphs, which are defined through 
vertices and edges. The 
+ * algorithms send messages along the edges and update the state of vertices 
based on
+ * the old state and the incoming messages. All vertices have an initial state.
+ * The computation terminates once no vertex receives any message anymore.
+ * Additionally, a maximum number of iterations (supersteps) may be specified.
+ * <p>
+ * The computation is here represented by one function:
+ * <ul>
+ *   <li>The {@link ComputeFunction} receives incoming messages, may update 
the state for
+ *   the vertex, and sends messages along the edges of the vertex.
+ *   </li>
+ * </ul>
+ * <p>
+ *
+ * Vertex-centric graph iterations are are run by calling
+ * {@link Graph#runVertexCentricIteration(ComputeFunction, int)}.
+ *
+ * @param <K> The type of the vertex key (the vertex identifier).
+ * @param <VV> The type of the vertex value (the state of the vertex).
+ * @param <Message> The type of the message sent between vertices along the 
edges.
+ * @param <EV> The type of the values that are associated with the edges.
+ */
+public class VertexCentricIteration<K, VV, EV, Message> 
+       implements CustomUnaryOperation<Vertex<K, VV>, Vertex<K, VV>> {
+
+       private final ComputeFunction<K, VV, EV, Message> computeFunction;
+
+       private final MessageCombiner<K, Message> combineFunction;
+       
+       private final DataSet<Edge<K, EV>> edgesWithValue;
+       
+       private final int maximumNumberOfIterations;
+       
+       private final TypeInformation<Message> messageType;
+       
+       private DataSet<Vertex<K, VV>> initialVertices;
+
+       private VertexCentricConfiguration configuration;
+
+       // 
----------------------------------------------------------------------------------
+       
+       private VertexCentricIteration(ComputeFunction<K, VV, EV, Message> cf,
+                       DataSet<Edge<K, EV>> edgesWithValue, MessageCombiner<K, 
Message> mc,
+                       int maximumNumberOfIterations) {
+
+               Preconditions.checkNotNull(cf);
+               Preconditions.checkNotNull(edgesWithValue);
+               Preconditions.checkArgument(maximumNumberOfIterations > 0,
+                               "The maximum number of iterations must be at 
least one.");
+
+               this.computeFunction = cf;
+               this.edgesWithValue = edgesWithValue;
+               this.combineFunction = mc;
+               this.maximumNumberOfIterations = maximumNumberOfIterations;     
        
+               this.messageType = getMessageType(cf);
+       }
+       
+       private TypeInformation<Message> getMessageType(ComputeFunction<K, VV, 
EV, Message> cf) {
+               return TypeExtractor.createTypeInfo(cf, ComputeFunction.class, 
cf.getClass(), 3);
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //  Custom Operator behavior
+       // 
--------------------------------------------------------------------------------------------
+       
+       /**
+        * Sets the input data set for this operator. In the case of this 
operator this input data set represents
+        * the set of vertices with their initial state.
+        * 
+        * @param inputData The input data set, which in the case of this 
operator represents the set of
+        *                  vertices with their initial state.
+        * 
+        * @see 
org.apache.flink.api.java.operators.CustomUnaryOperation#setInput(org.apache.flink.api.java.DataSet)
+        */
+       @Override
+       public void setInput(DataSet<Vertex<K, VV>> inputData) {
+               this.initialVertices = inputData;
+       }
+       
+       /**
+        * Creates the operator that represents this vertex-centric graph 
computation.
+        * 
+        * @return The operator that represents this vertex-centric graph 
computation.
+        */
+       @Override
+       public DataSet<Vertex<K, VV>> createResult() {
+               if (this.initialVertices == null) {
+                       throw new IllegalStateException("The input data set has 
not been set.");
+               }
+
+               // prepare the type information
+               TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
initialVertices.getType()).getTypeAt(0);
+               TypeInformation<Tuple2<K, Message>> messageTypeInfo =
+                       new TupleTypeInfo<Tuple2<K, Message>>(keyType, 
messageType);
+               TypeInformation<Vertex<K, VV>> vertexType = 
initialVertices.getType();
+               TypeInformation<Either<Vertex<K, VV>, Tuple2<K, Message>>> 
intermediateTypeInfo =
+                       new EitherTypeInfo<Vertex<K, VV>, Tuple2<K, 
Message>>(vertexType, messageTypeInfo);
+               TypeInformation<Either<NullValue, Message>> nullableMsgTypeInfo 
=
+                               new EitherTypeInfo<NullValue, 
Message>(TypeExtractor.getForClass(NullValue.class), messageType);
+               TypeInformation<Tuple2<K, Either<NullValue, Message>>> 
workSetTypeInfo =
+                       new TupleTypeInfo<Tuple2<K, Either<NullValue, 
Message>>>(keyType, nullableMsgTypeInfo);
+
+               DataSet<Tuple2<K, Either<NullValue, Message>>> initialWorkSet = 
initialVertices.map(
+                               new InitializeWorkSet<K, VV, 
Message>()).returns(workSetTypeInfo);
+
+               final DeltaIteration<Vertex<K, VV>,     Tuple2<K, 
Either<NullValue, Message>>> iteration =
+                               initialVertices.iterateDelta(initialWorkSet, 
this.maximumNumberOfIterations, 0);
+                               setUpIteration(iteration);
+
+               // join with the current state to get vertex values
+               DataSet<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> 
verticesWithMsgs =
+                               
iteration.getSolutionSet().join(iteration.getWorkset())
+                               .where(0).equalTo(0)
+                               .with(new AppendVertexState<K, VV, Message>())
+                               .returns(new TupleTypeInfo<Tuple2<Vertex<K, 
VV>, Either<NullValue, Message>>>(
+                                               vertexType, 
nullableMsgTypeInfo));
+
+               VertexComputeUdf<K, VV, EV, Message> vertexUdf =
+                               new VertexComputeUdf<K, VV, EV, 
Message>(computeFunction, intermediateTypeInfo); 
+
+               CoGroupOperator<?, ?, Either<Vertex<K, VV>, Tuple2<K, 
Message>>> superstepComputation =
+                               verticesWithMsgs.coGroup(edgesWithValue)
+                               .where("f0.f0").equalTo(0)
+                               .with(vertexUdf);
+
+               // compute the solution set delta
+               DataSet<Vertex<K, VV>> solutionSetDelta = 
superstepComputation.flatMap(
+                               new ProjectNewVertexValue<K, VV, 
Message>()).returns(vertexType);
+
+               // compute the inbox of each vertex for the next superstep (new 
workset)
+               DataSet<Tuple2<K, Either<NullValue, Message>>> allMessages = 
superstepComputation.flatMap(
+                               new ProjectMessages<K, VV, 
Message>()).returns(workSetTypeInfo);
+
+               DataSet<Tuple2<K, Either<NullValue, Message>>> newWorkSet = 
allMessages;
+
+               // check if a combiner has been provided
+               if (combineFunction != null) {
+
+                       MessageCombinerUdf<K, Message> combinerUdf =
+                                       new MessageCombinerUdf<K, 
Message>(combineFunction, workSetTypeInfo);
+
+                       DataSet<Tuple2<K, Either<NullValue, Message>>> 
combinedMessages = allMessages
+                                       .groupBy(0).reduceGroup(combinerUdf)
+                                       .setCombinable(true);
+
+                       newWorkSet = combinedMessages;
+               }
+
+               // configure the compute function
+               superstepComputation = superstepComputation.name("Compute 
Function");
+               if (this.configuration != null) {
+                       for (Tuple2<String, DataSet<?>> e : 
this.configuration.getBcastVars()) {
+                               superstepComputation = 
superstepComputation.withBroadcastSet(e.f1, e.f0);
+                       }
+               }
+
+               return iteration.closeWith(solutionSetDelta, newWorkSet);
+       }
+
+       /**
+        * Creates a new vertex-centric iteration operator.
+        * 
+        * @param edgesWithValue The data set containing edges.
+        * @param uf The function that updates the state of the vertices from 
the incoming messages.
+        * @param mf The function that turns changed vertex states into 
messages along the edges.
+        * 
+        * @param <K> The type of the vertex key (the vertex identifier).
+        * @param <VV> The type of the vertex value (the state of the vertex).
+        * @param <Message> The type of the message sent between vertices along 
the edges.
+        * @param <EV> The type of the values that are associated with the 
edges.
+        * 
+        * @return An in stance of the vertex-centric graph computation 
operator.
+        */
+       public static final <K, VV, EV, Message> VertexCentricIteration<K, VV, 
EV, Message> withEdges(
+               DataSet<Edge<K, EV>> edgesWithValue, ComputeFunction<K, VV, EV, 
Message> cf,
+               int maximumNumberOfIterations) {
+
+               return new VertexCentricIteration<K, VV, EV, Message>(cf, 
edgesWithValue, null,
+                               maximumNumberOfIterations);
+       }
+
+       /**
+        * Creates a new vertex-centric iteration operator for graphs where the 
edges are associated with a value (such as
+        * a weight or distance).
+        * 
+        * @param edgesWithValue The data set containing edges.
+        * @param uf The function that updates the state of the vertices from 
the incoming messages.
+        * @param mf The function that turns changed vertex states into 
messages along the edges.
+        * @param mc The function that combines messages sent to a vertex 
during a superstep.
+        * 
+        * @param <K> The type of the vertex key (the vertex identifier).
+        * @param <VV> The type of the vertex value (the state of the vertex).
+        * @param <Message> The type of the message sent between vertices along 
the edges.
+        * @param <EV> The type of the values that are associated with the 
edges.
+        * 
+        * @return An in stance of the vertex-centric graph computation 
operator.
+        */
+       public static final <K, VV, EV, Message> VertexCentricIteration<K, VV, 
EV, Message> withEdges(
+               DataSet<Edge<K, EV>> edgesWithValue, ComputeFunction<K, VV, EV, 
Message> cf,
+               MessageCombiner<K, Message> mc, int maximumNumberOfIterations) {
+
+               return new VertexCentricIteration<K, VV, EV, Message>(cf, 
edgesWithValue, mc,
+                               maximumNumberOfIterations);
+       }
+
+       /**
+        * Configures this vertex-centric iteration with the provided 
parameters.
+        *
+        * @param parameters the configuration parameters
+        */
+       public void configure(VertexCentricConfiguration parameters) {
+               this.configuration = parameters;
+       }
+
+       /**
+        * @return the configuration parameters of this vertex-centric iteration
+        */
+       public VertexCentricConfiguration getIterationConfiguration() {
+               return this.configuration;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Wrapping UDFs
+       // 
--------------------------------------------------------------------------------------------
+
+       @SuppressWarnings("serial")
+       private static class InitializeWorkSet<K, VV, Message> extends
+               RichMapFunction<Vertex<K, VV>, Tuple2<K, Either<NullValue, 
Message>>> {
+
+               private Tuple2<K, Either<NullValue, Message>> outTuple;
+               private Either<NullValue, Message> nullMessage;
+
+               @Override
+               public void open(Configuration parameters) {
+                       outTuple = new Tuple2<K, Either<NullValue, Message>>();
+                       nullMessage = Either.Left(NullValue.getInstance());
+               }
+
+               public Tuple2<K, Either<NullValue, Message>> map(Vertex<K, VV> 
vertex) {
+                       outTuple.setField(vertex.getId(), 0);
+                       outTuple.setField(nullMessage, 1);
+                       return outTuple;
+               }
+}
+       
+       @SuppressWarnings("serial")
+       private static class VertexComputeUdf<K, VV, EV, Message> extends 
RichCoGroupFunction<
+               Tuple2<Vertex<K, VV>, Either<NullValue, Message>>, Edge<K, EV>,
+               Either<Vertex<K, VV>, Tuple2<K, Message>>>
+               implements ResultTypeQueryable<Either<Vertex<K, VV>, Tuple2<K, 
Message>>> {
+
+               final ComputeFunction<K, VV, EV, Message> computeFunction;
+               private transient TypeInformation<Either<Vertex<K, VV>, 
Tuple2<K, Message>>> resultType;
+
+               private VertexComputeUdf(ComputeFunction<K, VV, EV, Message> 
compute,
+                               TypeInformation<Either<Vertex<K, VV>, Tuple2<K, 
Message>>> typeInfo) {
+
+                       this.computeFunction = compute;
+                       this.resultType = typeInfo;
+               }
+
+               @Override
+               public TypeInformation<Either<Vertex<K, VV>, Tuple2<K, 
Message>>> getProducedType() {
+                       return this.resultType;
+               }
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       if (getIterationRuntimeContext().getSuperstepNumber() 
== 1) {
+                               
this.computeFunction.init(getIterationRuntimeContext());
+                       }
+                       this.computeFunction.preSuperstep();
+               }
+               
+               @Override
+               public void close() throws Exception {
+                       this.computeFunction.postSuperstep();
+               }
+
+               @Override
+               public void coGroup(
+                               Iterable<Tuple2<Vertex<K, VV>, 
Either<NullValue, Message>>> messages,
+                               Iterable<Edge<K, EV>> edgesIterator,
+                               Collector<Either<Vertex<K, VV>, Tuple2<K, 
Message>>> out) throws Exception {
+
+                       final Iterator<Tuple2<Vertex<K, VV>, Either<NullValue, 
Message>>> vertexIter =
+                                       messages.iterator();
+
+                       if (vertexIter.hasNext()) {
+
+                               final Tuple2<Vertex<K, VV>, Either<NullValue, 
Message>> first = vertexIter.next();
+                               final Vertex<K, VV> vertexState = first.f0;
+                               final MessageIterator<Message> messageIter = 
new MessageIterator<Message>();
+
+                               if 
(getIterationRuntimeContext().getSuperstepNumber() == 1) {
+                                       
+                               }
+                               else {                          
+                               messageIter.setFirst(first.f1.right());
+
+                               @SuppressWarnings("unchecked")
+                               Iterator<Tuple2<?, Either<NullValue, Message>>> 
downcastIter =
+                                               (Iterator<Tuple2<?, 
Either<NullValue, Message>>>) (Iterator<?>) vertexIter;
+                               messageIter.setSource(downcastIter);
+                               }
+
+                               computeFunction.set(vertexState.getId(), 
edgesIterator.iterator(), out);
+                               computeFunction.compute(vertexState, 
messageIter);
+                       }
+               }
+       }
+
+       @SuppressWarnings("serial")
+       @ForwardedFields("f0")
+       public static class MessageCombinerUdf<K, Message> extends 
RichGroupReduceFunction<
+               Tuple2<K, Either<NullValue, Message>>, Tuple2<K, 
Either<NullValue, Message>>>
+               implements ResultTypeQueryable<Tuple2<K, Either<NullValue, 
Message>>>,
+               GroupCombineFunction<Tuple2<K, Either<NullValue, Message>>, 
Tuple2<K, Either<NullValue, Message>>> {
+
+               final MessageCombiner<K, Message> combinerFunction;
+               private transient TypeInformation<Tuple2<K, Either<NullValue, 
Message>>> resultType;
+
+               private MessageCombinerUdf(MessageCombiner<K, Message> 
combineFunction,
+                               TypeInformation<Tuple2<K, Either<NullValue, 
Message>>> messageTypeInfo) {
+
+                       this.combinerFunction = combineFunction;
+                       this.resultType = messageTypeInfo;
+               }
+
+               @Override
+               public TypeInformation<Tuple2<K, Either<NullValue, Message>>> 
getProducedType() {
+                       return resultType;
+               }
+
+               @Override
+               public void reduce(Iterable<Tuple2<K, Either<NullValue, 
Message>>> messages,
+                               Collector<Tuple2<K, Either<NullValue, 
Message>>> out) throws Exception {
+                       
+                       final Iterator<Tuple2<K, Either<NullValue, Message>>> 
messageIterator = messages.iterator();
+
+                       if (messageIterator.hasNext()) {
+
+                               final Tuple2<K, Either<NullValue, Message>> 
first = messageIterator.next();
+                               final K vertexID = first.f0;
+                               final MessageIterator<Message> messageIter = 
new MessageIterator<Message>();
+                               messageIter.setFirst(first.f1.right());
+
+                               @SuppressWarnings("unchecked")
+                               Iterator<Tuple2<?, Either<NullValue, Message>>> 
downcastIter =
+                                               (Iterator<Tuple2<?, 
Either<NullValue, Message>>>) (Iterator<?>) messageIterator;
+                               messageIter.setSource(downcastIter);
+
+                               combinerFunction.set(vertexID, out);
+                               combinerFunction.combineMessages(messageIter);
+                       }
+               }
+
+               @Override
+               public void combine(Iterable<Tuple2<K, Either<NullValue, 
Message>>> values,
+                               Collector<Tuple2<K, Either<NullValue, 
Message>>> out) throws Exception {
+                       this.reduce(values, out);
+               }
+       }
+
+
+       // 
--------------------------------------------------------------------------------------------
+       //  UTIL methods
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Helper method which sets up an iteration with the given vertex value
+        *
+        * @param iteration
+        */
+
+       private void setUpIteration(DeltaIteration<?, ?> iteration) {
+
+               // set up the iteration operator
+               if (this.configuration != null) {
+
+                       
iteration.name(this.configuration.getName("Vertex-centric iteration (" + 
computeFunction + ")"));
+                       
iteration.parallelism(this.configuration.getParallelism());
+                       
iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
+
+                       // register all aggregators
+                       for (Map.Entry<String, Aggregator<?>> entry : 
this.configuration.getAggregators().entrySet()) {
+                               iteration.registerAggregator(entry.getKey(), 
entry.getValue());
+                       }
+               }
+               else {
+                       // no configuration provided; set default name
+                       iteration.name("Vertex-centric iteration (" + 
computeFunction + ")");
+               }
+       }
+
+       @SuppressWarnings("serial")
+       @ForwardedFieldsFirst("*->f0")
+       @ForwardedFieldsSecond("f1->f1")
+       private static final class AppendVertexState<K, VV, Message> implements
+               FlatJoinFunction<Vertex<K, VV>, Tuple2<K, Either<NullValue, 
Message>>,
+               Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> {
+
+               private Tuple2<Vertex<K, VV>, Either<NullValue, Message>> 
outTuple =
+                               new Tuple2<Vertex<K, VV>, Either<NullValue, 
Message>>();
+
+               public void join(Vertex<K, VV> vertex, Tuple2<K, 
Either<NullValue, Message>> message,
+                               Collector<Tuple2<Vertex<K, VV>, 
Either<NullValue, Message>>> out) {
+
+                       outTuple.setField(vertex, 0);
+                       outTuple.setField(message.f1, 1);
+                       out.collect(outTuple);
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class ProjectNewVertexValue<K, VV, Message> 
implements
+               FlatMapFunction<Either<Vertex<K, VV>, Tuple2<K, Message>>, 
Vertex<K, VV>> {
+
+               public void flatMap(Either<Vertex<K, VV>, Tuple2<K, Message>> 
value,
+                               Collector<Vertex<K, VV>> out) {
+
+                       if (value.isLeft()) {
+                               out.collect(value.left());
+                       }
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class ProjectMessages<K, VV, Message> implements
+               FlatMapFunction<Either<Vertex<K, VV>, Tuple2<K, Message>>, 
Tuple2<K, Either<NullValue, Message>>> {
+
+               private Tuple2<K, Either<NullValue, Message>> outTuple = new 
Tuple2<K, Either<NullValue, Message>>();
+
+               public void flatMap(Either<Vertex<K, VV>, Tuple2<K, Message>> 
value,
+                               Collector<Tuple2<K, Either<NullValue, 
Message>>> out) {
+
+                       if (value.isRight()) {
+                               Tuple2<K, Message> message = value.right();
+                               outTuple.setField(message.f0, 0);
+                               outTuple.setField(Either.Right(message.f1), 1);
+                               out.collect(outTuple);
+                       }
+               }
+       }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/c5ffb5dd/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
index d25c294..e12d779 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
@@ -72,7 +72,7 @@ public abstract class MessagingFunction<K, VV, Message, EV> 
implements Serializa
        private EdgeDirection direction;
 
        /**
-        * Retrieves the edge direction in which messages are propagated in the 
vertex-centric iteration.
+        * Retrieves the edge direction in which messages are propagated in the 
scatter-gather iteration.
         * @return the messaging {@link EdgeDirection}
         */
        public EdgeDirection getDirection() {
@@ -98,14 +98,14 @@ public abstract class MessagingFunction<K, VV, Message, EV> 
implements Serializa
        public abstract void sendMessages(Vertex<K, VV> vertex) throws 
Exception;
        
        /**
-        * This method is executed one per superstep before the vertex update 
function is invoked for each vertex.
+        * This method is executed once per superstep before the vertex update 
function is invoked for each vertex.
         * 
         * @throws Exception Exceptions in the pre-superstep phase cause the 
superstep to fail.
         */
        public void preSuperstep() throws Exception {}
        
        /**
-        * This method is executed one per superstep after the vertex update 
function has been invoked for each vertex.
+        * This method is executed once per superstep after the vertex update 
function has been invoked for each vertex.
         * 
         * @throws Exception Exceptions in the post-superstep phase cause the 
superstep to fail.
         */
@@ -125,7 +125,7 @@ public abstract class MessagingFunction<K, VV, Message, EV> 
implements Serializa
        @SuppressWarnings("unchecked")
        public Iterable<Edge<K, EV>> getEdges() {
                if (edgesUsed) {
-                       throw new IllegalStateException("Can use either 
'getEdges()' or 'sendMessageToAllTargets()' exactly once.");
+                       throw new IllegalStateException("Can use either 
'getEdges()' or 'sendMessageToAllNeighbors()' exactly once.");
                }
                edgesUsed = true;
                this.edgeIterator.set((Iterator<Edge<K, EV>>) edges);

Reply via email to