[FLINK-3207] [gelly] add pregel iteration abstraction to gelly
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c5ffb5dd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c5ffb5dd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c5ffb5dd Branch: refs/heads/master Commit: c5ffb5ddd06b62f639145f8698c71fc74ff8c9b6 Parents: 35892ed Author: vasia <[email protected]> Authored: Tue Feb 2 12:13:51 2016 +0100 Committer: vasia <[email protected]> Committed: Mon Mar 21 19:10:29 2016 +0100 ---------------------------------------------------------------------- .../flink/graph/pregel/ComputeFunction.java | 266 ++++++++++ .../flink/graph/pregel/MessageCombiner.java | 71 +++ .../flink/graph/pregel/MessageIterator.java | 77 +++ .../pregel/VertexCentricConfiguration.java | 64 +++ .../graph/pregel/VertexCentricIteration.java | 504 +++++++++++++++++++ .../flink/graph/spargel/MessagingFunction.java | 8 +- 6 files changed, 986 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c5ffb5dd/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java new file mode 100644 index 0000000..09a000f --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.pregel; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Iterator; + +import org.apache.flink.api.common.aggregators.Aggregator; +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.Either; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.Value; +import org.apache.flink.util.Collector; + +/** + * The base class for the message-passing functions between vertices as a part of a {@link VertexCentricIteration}. + * + * @param <K> The type of the vertex key (the vertex identifier). + * @param <VV> The type of the vertex value (the state of the vertex). + * @param <EV> The type of the values that are associated with the edges. + * @param <Message> The type of the message sent between vertices along the edges. + */ +public abstract class ComputeFunction<K, VV, EV, Message> implements Serializable { + + private static final long serialVersionUID = 1L; + + // -------------------------------------------------------------------------------------------- + // Public API Methods + // -------------------------------------------------------------------------------------------- + + /** + * This method is invoked once per superstep, for each active vertex. + * A vertex is active during a superstep, if at least one message was produced for it, + * in the previous superstep. During the first superstep, all vertices are active. + * <p> + * This method can iterate over all received messages, set the new vertex value, and + * send messages to other vertices (which will be delivered in the next superstep). + * + * @param vertex The vertex executing this function + * @param messages The messages that were sent to this vertex in the previous superstep + * @throws Exception + */ + public abstract void compute(Vertex<K, VV> vertex, MessageIterator<Message> messages) throws Exception; + + /** + * This method is executed once per superstep before the vertex update function is invoked for each vertex. + * + * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail. + */ + public void preSuperstep() throws Exception {} + + /** + * This method is executed once per superstep after the vertex update function has been invoked for each vertex. + * + * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail. + */ + public void postSuperstep() throws Exception {} + + + /** + * Gets an {@link java.lang.Iterable} with all out-going edges. This method is mutually exclusive with + * {@link #sendMessageToAllNeighbors(Object)} and may be called only once. + * + * @return An iterator with all edges. + */ + public final Iterable<Edge<K, EV>> getEdges() { + if (edgesUsed) { + throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllNeighbors()' exactly once."); + } + edgesUsed = true; + this.edgeIterator.set((Iterator<Edge<K, EV>>) edges); + return this.edgeIterator; + } + + /** + * Sends the given message to all vertices that are targets of an edge of the changed vertex. + * This method is mutually exclusive to the method {@link #getEdges()} and may be called only once. + * + * @param m The message to send. + */ + public final void sendMessageToAllNeighbors(Message m) { + if (edgesUsed) { + throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllNeighbors()'" + + "exactly once."); + } + + edgesUsed = true; + + outMsg.setField(m, 1); + + while (edges.hasNext()) { + Tuple next = (Tuple) edges.next(); + outMsg.setField(next.getField(1), 0); + out.collect(Either.Right(outMsg)); + } + } + + /** + * Sends the given message to the vertex identified by the given key. If the target vertex does not exist, + * the next superstep will cause an exception due to a non-deliverable message. + * + * @param target The key (id) of the target vertex to message. + * @param m The message. + */ + public final void sendMessageTo(K target, Message m) { + + outMsg.setField(target, 0); + outMsg.setField(m, 1); + + out.collect(Either.Right(outMsg)); + } + + /** + * Sets the new value of this vertex. + * + * This should be called at most once per ComputeFunction. + * + * @param newValue The new vertex value. + */ + public final void setNewVertexValue(VV newValue) { + if(setNewVertexValueCalled) { + throw new IllegalStateException("setNewVertexValue should only be called at most once per updateVertex"); + } + setNewVertexValueCalled = true; + + outVertex.setField(newValue, 1); + + out.collect(Either.Left(outVertex)); + } + // -------------------------------------------------------------------------------------------- + + /** + * Gets the number of the superstep, starting at <tt>1</tt>. + * + * @return The number of the current superstep. + */ + public final int getSuperstepNumber() { + return this.runtimeContext.getSuperstepNumber(); + } + + /** + * Gets the iteration aggregator registered under the given name. The iteration aggregator combines + * all aggregates globally once per superstep and makes them available in the next superstep. + * + * @param name The name of the aggregator. + * @return The aggregator registered under this name, or null, if no aggregator was registered. + */ + public final <T extends Aggregator<?>> T getIterationAggregator(String name) { + return this.runtimeContext.<T>getIterationAggregator(name); + } + + /** + * Get the aggregated value that an aggregator computed in the previous iteration. + * + * @param name The name of the aggregator. + * @return The aggregated value of the previous iteration. + */ + public final <T extends Value> T getPreviousIterationAggregate(String name) { + return this.runtimeContext.<T>getPreviousIterationAggregate(name); + } + + /** + * Gets the broadcast data set registered under the given name. Broadcast data sets + * are available on all parallel instances of a function. They can be registered via + * {@link org.apache.flink.graph.spargel.VertexCentricConfiguration#addBroadcastSet(String, org.apache.flink.api.java.DataSet)}. + * + * @param name The name under which the broadcast set is registered. + * @return The broadcast data set. + */ + public final <T> Collection<T> getBroadcastSet(String name) { + return this.runtimeContext.<T>getBroadcastVariable(name); + } + + // -------------------------------------------------------------------------------------------- + // internal methods and state + // -------------------------------------------------------------------------------------------- + + private Vertex<K, VV> outVertex; + + private Tuple2<K, Message> outMsg; + + private IterationRuntimeContext runtimeContext; + + private Iterator<Edge<K, EV>> edges; + + private Collector<Either<?, ?>> out; + + private EdgesIterator<K, EV> edgeIterator; + + private boolean edgesUsed; + + private boolean setNewVertexValueCalled; + + void init(IterationRuntimeContext context) { + this.runtimeContext = context; + this.outVertex = new Vertex<K, VV>(); + this.outMsg = new Tuple2<K, Message>(); + this.edgeIterator = new EdgesIterator<K, EV>(); + } + + @SuppressWarnings("unchecked") + void set(K vertexId, Iterator<Edge<K, EV>> edges, + Collector<Either<Vertex<K, VV>, Tuple2<K, Message>>> out) { + + this.outVertex.setField(vertexId, 0); + this.edges = edges; + this.out = (Collector<Either<?, ?>>) (Collector<?>) out; + this.edgesUsed = false; + setNewVertexValueCalled = false; + } + + private static final class EdgesIterator<K, EV> + implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>> + { + private Iterator<Edge<K, EV>> input; + + private Edge<K, EV> edge = new Edge<K, EV>(); + + void set(Iterator<Edge<K, EV>> input) { + this.input = input; + } + + @Override + public boolean hasNext() { + return input.hasNext(); + } + + @Override + public Edge<K, EV> next() { + Edge<K, EV> next = input.next(); + edge.setSource(next.f0); + edge.setTarget(next.f1); + edge.setValue(next.f2); + return edge; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + @Override + public Iterator<Edge<K, EV>> iterator() { + return this; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c5ffb5dd/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java new file mode 100644 index 0000000..9458323 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.pregel; + +import java.io.Serializable; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.Either; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; + +/** + * The base class for combining messages sent during a {@link VertexCentricteration}. + * + * @param <K> The type of the vertex id + * @param <Message> The type of the message sent between vertices along the edges. + */ +public abstract class MessageCombiner<K, Message> implements Serializable { + + private static final long serialVersionUID = 1L; + + private Collector<Tuple2<K, Either<NullValue, Message>>> out; + + private K vertexId; + + private Tuple2<K, Either<NullValue, Message>> outValue; + + void set(K target, Collector<Tuple2<K, Either<NullValue, Message>>> collector) { + this.vertexId = target; + this.out = collector; + this.outValue = new Tuple2<K, Either<NullValue, Message>>(); + outValue.setField(vertexId, 0); + } + + /** + * Combines messages from sent different vertices to a target vertex. + * Implementing this method might reduce communication costs during a vertex-centric + * iteration. + * + * @param messages the input messages to combine + * @throws Exception + */ + public abstract void combineMessages(MessageIterator<Message> messages) throws Exception; + + /** + * Sends the combined message to the target vertex. + * + * @param combinedMessage + * @throws Exception + */ + public final void sendCombinedMessage(Message combinedMessage) { + outValue.setField(Either.Right(combinedMessage), 1); + out.collect(outValue); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c5ffb5dd/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java new file mode 100644 index 0000000..620cb93 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.pregel; + +import java.util.Iterator; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.Either; +import org.apache.flink.types.NullValue; + +/** + * An iterator that returns messages. The iterator is {@link java.lang.Iterable} at the same time to support + * the <i>foreach</i> syntax. + */ +public final class MessageIterator<Message> implements Iterator<Message>, Iterable<Message>, java.io.Serializable { + private static final long serialVersionUID = 1L; + + private transient Iterator<Tuple2<?, Either<NullValue, Message>>> source; + private Message first = null; + + final void setSource(Iterator<Tuple2<?, Either<NullValue, Message>>> source) { + this.source = source; + } + + final void setFirst(Message msg) { + this.first = msg; + } + + @Override + public final boolean hasNext() { + if (first != null) { + return true; + } + else if (this.source != null) { + return this.source.hasNext(); + } + else { + return false; + } + } + + @Override + public final Message next() { + if (first != null) { + Message toReturn = first; + first = null; + return toReturn; + } + return this.source.next().f1.right(); + } + + @Override + public final void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator<Message> iterator() { + return this; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/c5ffb5dd/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java new file mode 100644 index 0000000..287f20d --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.pregel; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.IterationConfiguration; + +import java.util.ArrayList; +import java.util.List; + +/** + * A VertexCentricConfiguration object can be used to set the iteration name and + * degree of parallelism, to register aggregators and use broadcast sets in + * the {@link org.apache.flink.graph.pregel.ComputeFunction}. + * + * The VertexCentricConfiguration object is passed as an argument to + * {@link org.apache.flink.graph.Graph#runVertexCentricIteration ( + * org.apache.flink.graph.pregel.ComputeFunction, int, VertexCentricConfiguration)}. + */ +public class VertexCentricConfiguration extends IterationConfiguration { + + /** the broadcast variables for the compute function **/ + private List<Tuple2<String, DataSet<?>>> bcVars = new ArrayList<Tuple2<String,DataSet<?>>>(); + + public VertexCentricConfiguration() {} + + /** + * Adds a data set as a broadcast set to the compute function. + * + * @param name The name under which the broadcast data is available in the compute function. + * @param data The data set to be broadcasted. + */ + public void addBroadcastSet(String name, DataSet<?> data) { + this.bcVars.add(new Tuple2<String, DataSet<?>>(name, data)); + } + + /** + * Get the broadcast variables of the compute function. + * + * @return a List of Tuple2, where the first field is the broadcast variable name + * and the second field is the broadcast DataSet. + */ + public List<Tuple2<String, DataSet<?>>> getBcastVars() { + return this.bcVars; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/c5ffb5dd/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java new file mode 100644 index 0000000..c79ace7 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.pregel; + +import java.util.Iterator; +import java.util.Map; + +import org.apache.flink.api.common.aggregators.Aggregator; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; +import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; +import org.apache.flink.api.java.operators.CoGroupOperator; +import org.apache.flink.api.java.operators.DeltaIteration; +import org.apache.flink.api.common.functions.RichCoGroupFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.operators.CustomUnaryOperation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.Either; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; + +import com.google.common.base.Preconditions; + +/** + * This class represents iterative graph computations, programmed in a vertex-centric perspective. + * It is a special case of <i>Bulk Synchronous Parallel</i> computation. The paradigm has also been + * implemented by Google's <i>Pregel</i> system and by <i>Apache Giraph</i>. + * <p> + * Vertex centric algorithms operate on graphs, which are defined through vertices and edges. The + * algorithms send messages along the edges and update the state of vertices based on + * the old state and the incoming messages. All vertices have an initial state. + * The computation terminates once no vertex receives any message anymore. + * Additionally, a maximum number of iterations (supersteps) may be specified. + * <p> + * The computation is here represented by one function: + * <ul> + * <li>The {@link ComputeFunction} receives incoming messages, may update the state for + * the vertex, and sends messages along the edges of the vertex. + * </li> + * </ul> + * <p> + * + * Vertex-centric graph iterations are are run by calling + * {@link Graph#runVertexCentricIteration(ComputeFunction, int)}. + * + * @param <K> The type of the vertex key (the vertex identifier). + * @param <VV> The type of the vertex value (the state of the vertex). + * @param <Message> The type of the message sent between vertices along the edges. + * @param <EV> The type of the values that are associated with the edges. + */ +public class VertexCentricIteration<K, VV, EV, Message> + implements CustomUnaryOperation<Vertex<K, VV>, Vertex<K, VV>> { + + private final ComputeFunction<K, VV, EV, Message> computeFunction; + + private final MessageCombiner<K, Message> combineFunction; + + private final DataSet<Edge<K, EV>> edgesWithValue; + + private final int maximumNumberOfIterations; + + private final TypeInformation<Message> messageType; + + private DataSet<Vertex<K, VV>> initialVertices; + + private VertexCentricConfiguration configuration; + + // ---------------------------------------------------------------------------------- + + private VertexCentricIteration(ComputeFunction<K, VV, EV, Message> cf, + DataSet<Edge<K, EV>> edgesWithValue, MessageCombiner<K, Message> mc, + int maximumNumberOfIterations) { + + Preconditions.checkNotNull(cf); + Preconditions.checkNotNull(edgesWithValue); + Preconditions.checkArgument(maximumNumberOfIterations > 0, + "The maximum number of iterations must be at least one."); + + this.computeFunction = cf; + this.edgesWithValue = edgesWithValue; + this.combineFunction = mc; + this.maximumNumberOfIterations = maximumNumberOfIterations; + this.messageType = getMessageType(cf); + } + + private TypeInformation<Message> getMessageType(ComputeFunction<K, VV, EV, Message> cf) { + return TypeExtractor.createTypeInfo(cf, ComputeFunction.class, cf.getClass(), 3); + } + + // -------------------------------------------------------------------------------------------- + // Custom Operator behavior + // -------------------------------------------------------------------------------------------- + + /** + * Sets the input data set for this operator. In the case of this operator this input data set represents + * the set of vertices with their initial state. + * + * @param inputData The input data set, which in the case of this operator represents the set of + * vertices with their initial state. + * + * @see org.apache.flink.api.java.operators.CustomUnaryOperation#setInput(org.apache.flink.api.java.DataSet) + */ + @Override + public void setInput(DataSet<Vertex<K, VV>> inputData) { + this.initialVertices = inputData; + } + + /** + * Creates the operator that represents this vertex-centric graph computation. + * + * @return The operator that represents this vertex-centric graph computation. + */ + @Override + public DataSet<Vertex<K, VV>> createResult() { + if (this.initialVertices == null) { + throw new IllegalStateException("The input data set has not been set."); + } + + // prepare the type information + TypeInformation<K> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0); + TypeInformation<Tuple2<K, Message>> messageTypeInfo = + new TupleTypeInfo<Tuple2<K, Message>>(keyType, messageType); + TypeInformation<Vertex<K, VV>> vertexType = initialVertices.getType(); + TypeInformation<Either<Vertex<K, VV>, Tuple2<K, Message>>> intermediateTypeInfo = + new EitherTypeInfo<Vertex<K, VV>, Tuple2<K, Message>>(vertexType, messageTypeInfo); + TypeInformation<Either<NullValue, Message>> nullableMsgTypeInfo = + new EitherTypeInfo<NullValue, Message>(TypeExtractor.getForClass(NullValue.class), messageType); + TypeInformation<Tuple2<K, Either<NullValue, Message>>> workSetTypeInfo = + new TupleTypeInfo<Tuple2<K, Either<NullValue, Message>>>(keyType, nullableMsgTypeInfo); + + DataSet<Tuple2<K, Either<NullValue, Message>>> initialWorkSet = initialVertices.map( + new InitializeWorkSet<K, VV, Message>()).returns(workSetTypeInfo); + + final DeltaIteration<Vertex<K, VV>, Tuple2<K, Either<NullValue, Message>>> iteration = + initialVertices.iterateDelta(initialWorkSet, this.maximumNumberOfIterations, 0); + setUpIteration(iteration); + + // join with the current state to get vertex values + DataSet<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> verticesWithMsgs = + iteration.getSolutionSet().join(iteration.getWorkset()) + .where(0).equalTo(0) + .with(new AppendVertexState<K, VV, Message>()) + .returns(new TupleTypeInfo<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>>( + vertexType, nullableMsgTypeInfo)); + + VertexComputeUdf<K, VV, EV, Message> vertexUdf = + new VertexComputeUdf<K, VV, EV, Message>(computeFunction, intermediateTypeInfo); + + CoGroupOperator<?, ?, Either<Vertex<K, VV>, Tuple2<K, Message>>> superstepComputation = + verticesWithMsgs.coGroup(edgesWithValue) + .where("f0.f0").equalTo(0) + .with(vertexUdf); + + // compute the solution set delta + DataSet<Vertex<K, VV>> solutionSetDelta = superstepComputation.flatMap( + new ProjectNewVertexValue<K, VV, Message>()).returns(vertexType); + + // compute the inbox of each vertex for the next superstep (new workset) + DataSet<Tuple2<K, Either<NullValue, Message>>> allMessages = superstepComputation.flatMap( + new ProjectMessages<K, VV, Message>()).returns(workSetTypeInfo); + + DataSet<Tuple2<K, Either<NullValue, Message>>> newWorkSet = allMessages; + + // check if a combiner has been provided + if (combineFunction != null) { + + MessageCombinerUdf<K, Message> combinerUdf = + new MessageCombinerUdf<K, Message>(combineFunction, workSetTypeInfo); + + DataSet<Tuple2<K, Either<NullValue, Message>>> combinedMessages = allMessages + .groupBy(0).reduceGroup(combinerUdf) + .setCombinable(true); + + newWorkSet = combinedMessages; + } + + // configure the compute function + superstepComputation = superstepComputation.name("Compute Function"); + if (this.configuration != null) { + for (Tuple2<String, DataSet<?>> e : this.configuration.getBcastVars()) { + superstepComputation = superstepComputation.withBroadcastSet(e.f1, e.f0); + } + } + + return iteration.closeWith(solutionSetDelta, newWorkSet); + } + + /** + * Creates a new vertex-centric iteration operator. + * + * @param edgesWithValue The data set containing edges. + * @param uf The function that updates the state of the vertices from the incoming messages. + * @param mf The function that turns changed vertex states into messages along the edges. + * + * @param <K> The type of the vertex key (the vertex identifier). + * @param <VV> The type of the vertex value (the state of the vertex). + * @param <Message> The type of the message sent between vertices along the edges. + * @param <EV> The type of the values that are associated with the edges. + * + * @return An in stance of the vertex-centric graph computation operator. + */ + public static final <K, VV, EV, Message> VertexCentricIteration<K, VV, EV, Message> withEdges( + DataSet<Edge<K, EV>> edgesWithValue, ComputeFunction<K, VV, EV, Message> cf, + int maximumNumberOfIterations) { + + return new VertexCentricIteration<K, VV, EV, Message>(cf, edgesWithValue, null, + maximumNumberOfIterations); + } + + /** + * Creates a new vertex-centric iteration operator for graphs where the edges are associated with a value (such as + * a weight or distance). + * + * @param edgesWithValue The data set containing edges. + * @param uf The function that updates the state of the vertices from the incoming messages. + * @param mf The function that turns changed vertex states into messages along the edges. + * @param mc The function that combines messages sent to a vertex during a superstep. + * + * @param <K> The type of the vertex key (the vertex identifier). + * @param <VV> The type of the vertex value (the state of the vertex). + * @param <Message> The type of the message sent between vertices along the edges. + * @param <EV> The type of the values that are associated with the edges. + * + * @return An in stance of the vertex-centric graph computation operator. + */ + public static final <K, VV, EV, Message> VertexCentricIteration<K, VV, EV, Message> withEdges( + DataSet<Edge<K, EV>> edgesWithValue, ComputeFunction<K, VV, EV, Message> cf, + MessageCombiner<K, Message> mc, int maximumNumberOfIterations) { + + return new VertexCentricIteration<K, VV, EV, Message>(cf, edgesWithValue, mc, + maximumNumberOfIterations); + } + + /** + * Configures this vertex-centric iteration with the provided parameters. + * + * @param parameters the configuration parameters + */ + public void configure(VertexCentricConfiguration parameters) { + this.configuration = parameters; + } + + /** + * @return the configuration parameters of this vertex-centric iteration + */ + public VertexCentricConfiguration getIterationConfiguration() { + return this.configuration; + } + + // -------------------------------------------------------------------------------------------- + // Wrapping UDFs + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("serial") + private static class InitializeWorkSet<K, VV, Message> extends + RichMapFunction<Vertex<K, VV>, Tuple2<K, Either<NullValue, Message>>> { + + private Tuple2<K, Either<NullValue, Message>> outTuple; + private Either<NullValue, Message> nullMessage; + + @Override + public void open(Configuration parameters) { + outTuple = new Tuple2<K, Either<NullValue, Message>>(); + nullMessage = Either.Left(NullValue.getInstance()); + } + + public Tuple2<K, Either<NullValue, Message>> map(Vertex<K, VV> vertex) { + outTuple.setField(vertex.getId(), 0); + outTuple.setField(nullMessage, 1); + return outTuple; + } +} + + @SuppressWarnings("serial") + private static class VertexComputeUdf<K, VV, EV, Message> extends RichCoGroupFunction< + Tuple2<Vertex<K, VV>, Either<NullValue, Message>>, Edge<K, EV>, + Either<Vertex<K, VV>, Tuple2<K, Message>>> + implements ResultTypeQueryable<Either<Vertex<K, VV>, Tuple2<K, Message>>> { + + final ComputeFunction<K, VV, EV, Message> computeFunction; + private transient TypeInformation<Either<Vertex<K, VV>, Tuple2<K, Message>>> resultType; + + private VertexComputeUdf(ComputeFunction<K, VV, EV, Message> compute, + TypeInformation<Either<Vertex<K, VV>, Tuple2<K, Message>>> typeInfo) { + + this.computeFunction = compute; + this.resultType = typeInfo; + } + + @Override + public TypeInformation<Either<Vertex<K, VV>, Tuple2<K, Message>>> getProducedType() { + return this.resultType; + } + + @Override + public void open(Configuration parameters) throws Exception { + if (getIterationRuntimeContext().getSuperstepNumber() == 1) { + this.computeFunction.init(getIterationRuntimeContext()); + } + this.computeFunction.preSuperstep(); + } + + @Override + public void close() throws Exception { + this.computeFunction.postSuperstep(); + } + + @Override + public void coGroup( + Iterable<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> messages, + Iterable<Edge<K, EV>> edgesIterator, + Collector<Either<Vertex<K, VV>, Tuple2<K, Message>>> out) throws Exception { + + final Iterator<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> vertexIter = + messages.iterator(); + + if (vertexIter.hasNext()) { + + final Tuple2<Vertex<K, VV>, Either<NullValue, Message>> first = vertexIter.next(); + final Vertex<K, VV> vertexState = first.f0; + final MessageIterator<Message> messageIter = new MessageIterator<Message>(); + + if (getIterationRuntimeContext().getSuperstepNumber() == 1) { + + } + else { + messageIter.setFirst(first.f1.right()); + + @SuppressWarnings("unchecked") + Iterator<Tuple2<?, Either<NullValue, Message>>> downcastIter = + (Iterator<Tuple2<?, Either<NullValue, Message>>>) (Iterator<?>) vertexIter; + messageIter.setSource(downcastIter); + } + + computeFunction.set(vertexState.getId(), edgesIterator.iterator(), out); + computeFunction.compute(vertexState, messageIter); + } + } + } + + @SuppressWarnings("serial") + @ForwardedFields("f0") + public static class MessageCombinerUdf<K, Message> extends RichGroupReduceFunction< + Tuple2<K, Either<NullValue, Message>>, Tuple2<K, Either<NullValue, Message>>> + implements ResultTypeQueryable<Tuple2<K, Either<NullValue, Message>>>, + GroupCombineFunction<Tuple2<K, Either<NullValue, Message>>, Tuple2<K, Either<NullValue, Message>>> { + + final MessageCombiner<K, Message> combinerFunction; + private transient TypeInformation<Tuple2<K, Either<NullValue, Message>>> resultType; + + private MessageCombinerUdf(MessageCombiner<K, Message> combineFunction, + TypeInformation<Tuple2<K, Either<NullValue, Message>>> messageTypeInfo) { + + this.combinerFunction = combineFunction; + this.resultType = messageTypeInfo; + } + + @Override + public TypeInformation<Tuple2<K, Either<NullValue, Message>>> getProducedType() { + return resultType; + } + + @Override + public void reduce(Iterable<Tuple2<K, Either<NullValue, Message>>> messages, + Collector<Tuple2<K, Either<NullValue, Message>>> out) throws Exception { + + final Iterator<Tuple2<K, Either<NullValue, Message>>> messageIterator = messages.iterator(); + + if (messageIterator.hasNext()) { + + final Tuple2<K, Either<NullValue, Message>> first = messageIterator.next(); + final K vertexID = first.f0; + final MessageIterator<Message> messageIter = new MessageIterator<Message>(); + messageIter.setFirst(first.f1.right()); + + @SuppressWarnings("unchecked") + Iterator<Tuple2<?, Either<NullValue, Message>>> downcastIter = + (Iterator<Tuple2<?, Either<NullValue, Message>>>) (Iterator<?>) messageIterator; + messageIter.setSource(downcastIter); + + combinerFunction.set(vertexID, out); + combinerFunction.combineMessages(messageIter); + } + } + + @Override + public void combine(Iterable<Tuple2<K, Either<NullValue, Message>>> values, + Collector<Tuple2<K, Either<NullValue, Message>>> out) throws Exception { + this.reduce(values, out); + } + } + + + // -------------------------------------------------------------------------------------------- + // UTIL methods + // -------------------------------------------------------------------------------------------- + + /** + * Helper method which sets up an iteration with the given vertex value + * + * @param iteration + */ + + private void setUpIteration(DeltaIteration<?, ?> iteration) { + + // set up the iteration operator + if (this.configuration != null) { + + iteration.name(this.configuration.getName("Vertex-centric iteration (" + computeFunction + ")")); + iteration.parallelism(this.configuration.getParallelism()); + iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory()); + + // register all aggregators + for (Map.Entry<String, Aggregator<?>> entry : this.configuration.getAggregators().entrySet()) { + iteration.registerAggregator(entry.getKey(), entry.getValue()); + } + } + else { + // no configuration provided; set default name + iteration.name("Vertex-centric iteration (" + computeFunction + ")"); + } + } + + @SuppressWarnings("serial") + @ForwardedFieldsFirst("*->f0") + @ForwardedFieldsSecond("f1->f1") + private static final class AppendVertexState<K, VV, Message> implements + FlatJoinFunction<Vertex<K, VV>, Tuple2<K, Either<NullValue, Message>>, + Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> { + + private Tuple2<Vertex<K, VV>, Either<NullValue, Message>> outTuple = + new Tuple2<Vertex<K, VV>, Either<NullValue, Message>>(); + + public void join(Vertex<K, VV> vertex, Tuple2<K, Either<NullValue, Message>> message, + Collector<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> out) { + + outTuple.setField(vertex, 0); + outTuple.setField(message.f1, 1); + out.collect(outTuple); + } + } + + @SuppressWarnings("serial") + private static final class ProjectNewVertexValue<K, VV, Message> implements + FlatMapFunction<Either<Vertex<K, VV>, Tuple2<K, Message>>, Vertex<K, VV>> { + + public void flatMap(Either<Vertex<K, VV>, Tuple2<K, Message>> value, + Collector<Vertex<K, VV>> out) { + + if (value.isLeft()) { + out.collect(value.left()); + } + } + } + + @SuppressWarnings("serial") + private static final class ProjectMessages<K, VV, Message> implements + FlatMapFunction<Either<Vertex<K, VV>, Tuple2<K, Message>>, Tuple2<K, Either<NullValue, Message>>> { + + private Tuple2<K, Either<NullValue, Message>> outTuple = new Tuple2<K, Either<NullValue, Message>>(); + + public void flatMap(Either<Vertex<K, VV>, Tuple2<K, Message>> value, + Collector<Tuple2<K, Either<NullValue, Message>>> out) { + + if (value.isRight()) { + Tuple2<K, Message> message = value.right(); + outTuple.setField(message.f0, 0); + outTuple.setField(Either.Right(message.f1), 1); + out.collect(outTuple); + } + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/c5ffb5dd/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java index d25c294..e12d779 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java @@ -72,7 +72,7 @@ public abstract class MessagingFunction<K, VV, Message, EV> implements Serializa private EdgeDirection direction; /** - * Retrieves the edge direction in which messages are propagated in the vertex-centric iteration. + * Retrieves the edge direction in which messages are propagated in the scatter-gather iteration. * @return the messaging {@link EdgeDirection} */ public EdgeDirection getDirection() { @@ -98,14 +98,14 @@ public abstract class MessagingFunction<K, VV, Message, EV> implements Serializa public abstract void sendMessages(Vertex<K, VV> vertex) throws Exception; /** - * This method is executed one per superstep before the vertex update function is invoked for each vertex. + * This method is executed once per superstep before the vertex update function is invoked for each vertex. * * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail. */ public void preSuperstep() throws Exception {} /** - * This method is executed one per superstep after the vertex update function has been invoked for each vertex. + * This method is executed once per superstep after the vertex update function has been invoked for each vertex. * * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail. */ @@ -125,7 +125,7 @@ public abstract class MessagingFunction<K, VV, Message, EV> implements Serializa @SuppressWarnings("unchecked") public Iterable<Edge<K, EV>> getEdges() { if (edgesUsed) { - throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllTargets()' exactly once."); + throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllNeighbors()' exactly once."); } edgesUsed = true; this.edgeIterator.set((Iterator<Edge<K, EV>>) edges);
