[
https://issues.apache.org/jira/browse/FLINK-1523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14501354#comment-14501354
]
ASF GitHub Bot commented on FLINK-1523:
---------------------------------------
Github user vasia commented on a diff in the pull request:
https://github.com/apache/flink/pull/537#discussion_r28643926
--- Diff:
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
---
@@ -138,69 +146,46 @@ public void setInput(DataSet<Vertex<VertexKey,
VertexValue>> inputData) {
if (this.initialVertices == null) {
throw new IllegalStateException("The input data set has
not been set.");
}
-
+
// prepare some type information
- TypeInformation<Vertex<VertexKey, VertexValue>> vertexTypes =
initialVertices.getType();
TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>)
initialVertices.getType()).getTypeAt(0);
TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo =
new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType, messageType);
- final int[] zeroKeyPos = new int[] {0};
-
- final DeltaIteration<Vertex<VertexKey, VertexValue>,
Vertex<VertexKey, VertexValue>> iteration =
- this.initialVertices.iterateDelta(this.initialVertices,
this.maximumNumberOfIterations, zeroKeyPos);
+ // create a graph
+ Graph<VertexKey, VertexValue, EdgeValue> graph =
+ Graph.fromDataSet(initialVertices,
edgesWithValue, ExecutionEnvironment.getExecutionEnvironment());
- // set up the iteration operator
- if (this.configuration != null) {
+ // check whether the numVertices option is set and, if so,
compute the total number of vertices
+ // and set it within the messaging and update functions
- iteration.name(this.configuration.getName(
- "Vertex-centric iteration (" +
updateFunction + " | " + messagingFunction + ")"));
-
iteration.parallelism(this.configuration.getParallelism());
-
iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
-
- // register all aggregators
- for (Map.Entry<String, Aggregator<?>> entry :
this.configuration.getAggregators().entrySet()) {
- iteration.registerAggregator(entry.getKey(),
entry.getValue());
+ if (this.configuration != null &&
this.configuration.isOptNumVertices()) {
+ try {
+ long numberOfVertices =
graph.numberOfVertices();
+
messagingFunction.setNumberOfVertices(numberOfVertices);
+
updateFunction.setNumberOfVertices(numberOfVertices);
+ } catch (Exception e) {
+ e.printStackTrace();
}
}
- else {
- // no configuration provided; set default name
- iteration.name("Vertex-centric iteration (" +
updateFunction + " | " + messagingFunction + ")");
- }
-
- // build the messaging function (co group)
- CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages;
- MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message,
EdgeValue> messenger = new MessagingUdfWithEdgeValues<VertexKey, VertexValue,
Message, EdgeValue>(messagingFunction, messageTypeInfo);
- messages =
this.edgesWithValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
-
- // configure coGroup message function with name and broadcast
variables
- messages = messages.name("Messaging");
- if (this.configuration != null) {
- for (Tuple2<String, DataSet<?>> e :
this.configuration.getMessagingBcastVars()) {
- messages = messages.withBroadcastSet(e.f1,
e.f0);
- }
+ if(this.configuration != null) {
+
messagingFunction.setDirection(this.configuration.getDirection());
+ } else {
+ messagingFunction.setDirection(EdgeDirection.OUT);
}
-
- VertexUpdateUdf<VertexKey, VertexValue, Message> updateUdf =
new VertexUpdateUdf<VertexKey, VertexValue, Message>(updateFunction,
vertexTypes);
-
- // build the update function (co group)
- CoGroupOperator<?, ?, Vertex<VertexKey, VertexValue>> updates =
-
messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
-
- // configure coGroup update function with name and broadcast
variables
- updates = updates.name("Vertex State Updates");
- if (this.configuration != null) {
- for (Tuple2<String, DataSet<?>> e :
this.configuration.getUpdateBcastVars()) {
- updates = updates.withBroadcastSet(e.f1, e.f0);
- }
- }
+ // retrieve the direction in which the updates are made and in
which the messages are sent
+ EdgeDirection messagingDirection =
messagingFunction.getDirection();
- // let the operator know that we preserve the key field
-
updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
-
- return iteration.closeWith(updates, updates);
-
+ DataSet<Tuple2<VertexKey, Message>> messages = null;
--- End diff --
why create a null dataset and pass it as a parameter to the methods below?
> Vertex-centric iteration extensions
> -----------------------------------
>
> Key: FLINK-1523
> URL: https://issues.apache.org/jira/browse/FLINK-1523
> Project: Flink
> Issue Type: Improvement
> Components: Gelly
> Reporter: Vasia Kalavri
> Assignee: Andra Lungu
>
> We would like to make the following extensions to the vertex-centric
> iterations of Gelly:
> - allow vertices to access their in/out degrees and the total number of
> vertices of the graph, inside the iteration.
> - allow choosing the neighborhood type (in/out/all) over which to run the
> vertex-centric iteration. Now, the model uses the updates of the in-neighbors
> to calculate state and send messages to out-neighbors. We could add a
> parameter with value "in/out/all" to the {{VertexUpdateFunction}} and
> {{MessagingFunction}}, that would indicate the type of neighborhood.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)