[
https://issues.apache.org/jira/browse/FLINK-1523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14501312#comment-14501312
]
ASF GitHub Bot commented on FLINK-1523:
---------------------------------------
Github user vasia commented on a diff in the pull request:
https://github.com/apache/flink/pull/537#discussion_r28643463
--- Diff:
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.IncrementalSSSPData;
+import org.apache.flink.graph.spargel.IterationConfiguration;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+/**
+ * Incremental Single Sink Shortest Paths Example.
+ *
+ * The program takes as input the resulted graph after a SSSP computation,
+ * an edge to be removed and the initial graph(i.e. before SSSP was
computed).
+ *
+ * - If the removed edge does not belong to the SP-graph, no computation
is necessary.
+ * The edge is simply removed from the graph.
+ * - If the removed edge is an SP-edge, then all nodes, whose shortest
path contains the removed edge,
+ * potentially require re-computation.
+ * When the edge <u, v> is removed, v checks if it has another out-going
SP-edge.
+ * If yes, no further computation is required.
+ * If v has no other out-going SP-edge, it invalidates its current value,
by setting it to INF.
+ * Then, it informs all its SP-in-neighbors by sending them an INVALIDATE
message.
+ * When a vertex u receives an INVALIDATE message from v, it checks
whether it has another out-going SP-edge.
+ * If not, it invalidates its current value and propagates the INVALIDATE
message.
+ * The propagation stops when a vertex with an alternative shortest path
is reached
+ * or when we reach a vertex with no SP-in-neighbors.
+ *
+ * Usage <code>IncrementalSSSPExample <vertex path> <edge
path> <edges in SSSP>
+ * <edge to be removed> <result path> <number of
iterations></code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.IncrementalSSSPData}
+ */
+@SuppressWarnings("serial")
+public class IncrementalSSSPExample implements ProgramDescription {
+
+ public static void main(String [] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Vertex<Long, Double>> vertices =
getVerticesDataSet(env);
+
+ DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+ DataSet<Edge<Long, Double>> edgesInSSSP =
getEdgesinSSSPDataSet(env);
+
+ Edge<Long, Double> edgeToBeRemoved = getEdgeToBeRemoved();
+
+ Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices,
edges, env);
+
+ // Assumption: all minimum weight paths are kept
+ Graph<Long, Double, Double> ssspGraph =
Graph.fromDataSet(vertices, edgesInSSSP, env);
+
+ // remove the edge
+ graph.removeEdge(edgeToBeRemoved);
+
+ // configure the iteration
+ IterationConfiguration parameters = new
IterationConfiguration();
+
+ if(isInSSSP(edgeToBeRemoved, edgesInSSSP)) {
+
+ parameters.setDirection(EdgeDirection.IN);
+ parameters.setOptDegrees(true);
+
+ // run the vertex centric iteration to propagate info
+ Graph<Long, Double, Double> result =
ssspGraph.runVertexCentricIteration(new VertexDistanceUpdater(),
+ new
InvalidateMessenger(edgeToBeRemoved), maxIterations, parameters);
+
+ DataSet<Vertex<Long, Double>> resultedVertices =
result.getVertices();
+
+ // Emit results
+ if(fileOutput) {
+ resultedVertices.writeAsCsv(outputPath, "\n",
",");
+ } else {
+ resultedVertices.print();
+ }
+
+ env.execute("Incremental SSSP Example");
+ } else {
+ // print the vertices
+ if(fileOutput) {
+ vertices.writeAsCsv(outputPath, "\n", ",");
+ } else {
+ vertices.print();
+ }
+
+ env.execute("Incremental SSSP Example");
+ }
+ }
+
+ @Override
+ public String getDescription() {
+ return "Incremental Single Sink Shortest Paths Example";
+ }
+
+ //
******************************************************************************************************************
+ // IncrementalSSSP METHODS
+ //
******************************************************************************************************************
+
+ /**
+ * Function that verifies whether the edge to be removed is part of the
SSSP or not.
+ * If it is, the src vertex will be invalidated.
+ *
+ * @param edgeToBeRemoved
+ * @param edgesInSSSP
+ * @return
+ */
+ private static boolean isInSSSP(final Edge<Long, Double>
edgeToBeRemoved, DataSet<Edge<Long, Double>> edgesInSSSP) throws Exception {
+
+ return edgesInSSSP.filter(new FilterFunction<Edge<Long,
Double>>() {
+ @Override
+ public boolean filter(Edge<Long, Double> edge) throws
Exception {
+ return edge.equals(edgeToBeRemoved);
+ }
+ }).count() > 0;
+ }
+
+ public static final class VertexDistanceUpdater extends
VertexUpdateFunction<Long, Double, Double> {
+
+ @Override
+ public void updateVertex(Vertex<Long, Double> vertex,
MessageIterator<Double> inMessages) throws Exception {
+ if (inMessages.hasNext()) {
+ Long outDegree = vertex.getOutDegree() - 1;
+ // check if the vertex has another SP-Edge
+ if (outDegree > 0) {
+ // there is another shortest path from
the source to this vertex
+ } else {
+ // set own value to infinity
+ setNewVertexValue(Double.MAX_VALUE);
+ }
+ }
+ }
+ }
+
+ public static final class InvalidateMessenger extends
MessagingFunction<Long, Double, Double, Double> {
+
+ private Edge<Long, Double> edgeToBeRemoved;
+
+ public InvalidateMessenger(Edge<Long, Double> edgeToBeRemoved) {
+ this.edgeToBeRemoved = edgeToBeRemoved;
+ }
+
+ @Override
+ public void sendMessages(Vertex<Long, Double> vertex) throws
Exception {
+
+
+ if(getSuperstepNumber() == 1) {
+
if(vertex.getId().equals(edgeToBeRemoved.getSource())) {
+ // activate the edge target
+
sendMessageTo(edgeToBeRemoved.getSource(), Double.MAX_VALUE);
+ }
+ }
+
+ if(getSuperstepNumber() > 1) {
+ // invalidate all edges
+ for(Edge<Long, Double> edge : getEdges()) {
+ sendMessageTo(edge.getSource(),
Double.MAX_VALUE);
+ }
+ }
+ }
+ }
+
+ //
******************************************************************************************************************
+ // UTIL METHODS
+ //
******************************************************************************************************************
+
+ private static boolean fileOutput = false;
+
+ private static String verticesInputPath = null;
+
+ private static String edgesInputPath = null;
+
+ private static String edgesInSSSPInputPath = null;
+
+ private static String edgeToBeRemoved = null;
+
+ private static String outputPath = null;
+
+ private static int maxIterations = 5;
+
+ private static boolean parseParameters(String[] args) {
+ if (args.length > 0) {
+ if (args.length == 6) {
+ fileOutput = true;
+ verticesInputPath = args[0];
+ edgesInputPath = args[1];
+ edgesInSSSPInputPath = args[2];
+ edgeToBeRemoved = args[3];
+ outputPath = args[4];
+ maxIterations = Integer.parseInt(args[5]);
+ } else {
+ System.out.println("Executing IncrementalSSSP
example with default parameters and built-in default data.");
+ System.out.println("Provide parameters to read
input data from files.");
+ System.out.println("See the documentation for
the correct format of input files.");
+ System.out.println("Usage: IncrementalSSSP
<vertex path> <edge path> <edges in SSSP> <edge to be removed> <output path>
<max iterations>");
+
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static DataSet<Vertex<Long, Double>>
getVerticesDataSet(ExecutionEnvironment env) {
+ if (fileOutput) {
+ return env.readCsvFile(verticesInputPath)
+ .lineDelimiter("\n")
+ .types(Long.class, Double.class)
+ .map(new MapFunction<Tuple2<Long,
Double>, Vertex<Long, Double>>() {
+
+ @Override
+ public Vertex<Long, Double>
map(Tuple2<Long, Double> tuple2) throws Exception {
+ return new Vertex<Long,
Double>(tuple2.f0, tuple2.f1);
+ }
+ });
+ } else {
+ System.err.println("Usage: IncrementalSSSP <vertex
path> <edge path> <edges in SSSP> <edge to be removed> " +
+ "<output path> <max iterations>");
+ return IncrementalSSSPData.getDefaultVertexDataSet(env);
+ }
+ }
+
+ private static DataSet<Edge<Long, Double>>
getEdgesDataSet(ExecutionEnvironment env) {
+ if (fileOutput) {
+ return env.readCsvFile(edgesInputPath)
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class,
Double.class)
+ .map(new MapFunction<Tuple3<Long, Long,
Double>, Edge<Long, Double>>() {
+
+ @Override
+ public Edge<Long, Double>
map(Tuple3<Long, Long, Double> tuple3) throws Exception {
--- End diff --
and a Tuple3ToEdgeMap :-)
> Vertex-centric iteration extensions
> -----------------------------------
>
> Key: FLINK-1523
> URL: https://issues.apache.org/jira/browse/FLINK-1523
> Project: Flink
> Issue Type: Improvement
> Components: Gelly
> Reporter: Vasia Kalavri
> Assignee: Andra Lungu
>
> We would like to make the following extensions to the vertex-centric
> iterations of Gelly:
> - allow vertices to access their in/out degrees and the total number of
> vertices of the graph, inside the iteration.
> - allow choosing the neighborhood type (in/out/all) over which to run the
> vertex-centric iteration. Now, the model uses the updates of the in-neighbors
> to calculate state and send messages to out-neighbors. We could add a
> parameter with value "in/out/all" to the {{VertexUpdateFunction}} and
> {{MessagingFunction}}, that would indicate the type of neighborhood.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)