Repository: flink
Updated Branches:
  refs/heads/master f9eea5e5a -> f2186a604


[FLINK-2561] [gelly] add gelly-scala examples: vertex-centric SSSP, GSA SSSP
and how to use a library method (connected components).

This closes #1211


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2186a60
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2186a60
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2186a60

Branch: refs/heads/master
Commit: f2186a604f407e7b6db534cf6f9e50e27eac765a
Parents: f9eea5e
Author: vasia <[email protected]>
Authored: Thu Oct 1 22:26:25 2015 +0200
Committer: vasia <[email protected]>
Committed: Wed Oct 7 22:37:08 2015 +0200

----------------------------------------------------------------------
 docs/libs/gelly_guide.md                        |   4 +-
 .../org/apache/flink/graph/scala/Graph.scala    |  16 +-
 .../scala/example/ConnectedComponents.scala     | 121 +++++++++++++
 .../example/GSASingleSourceShortestPaths.scala  | 156 +++++++++++++++++
 .../graph/scala/example/GraphMetrics.scala      |  19 +--
 .../example/SingleSourceShortestPaths.scala     | 170 +++++++++++++++++++
 .../graph/example/ConnectedComponents.java      |   2 +-
 .../utils/ConnectedComponentsDefaultData.java   |  21 ++-
 .../utils/SingleSourceShortestPathsData.java    |  37 ++--
 9 files changed, 501 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index fa2c86c..766b395 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -265,10 +265,10 @@ If no vertex input is provided during Graph creation, 
Gelly will automatically p
 val env = ExecutionEnvironment.getExecutionEnvironment
 
 // initialize the vertex value to be equal to the vertex ID
-val graph = Graph.fromCollection(edgeList, env,
+val graph = Graph.fromCollection(edgeList,
     new MapFunction[Long, Long] {
        def map(id: Long): Long = id
-    })
+    }, env)
 {% endhighlight %}
 </div>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
 
b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index 38702f3..28f3f12 100644
--- 
a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ 
b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -57,8 +57,8 @@ object Graph {
   * map function to the vertex ids.
   */
   def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : 
ClassTag, EV:
-  TypeInformation : ClassTag](edges: DataSet[Edge[K, EV]], env: 
ExecutionEnvironment,
-  mapper: MapFunction[K, VV]): Graph[K, VV, EV] = {
+  TypeInformation : ClassTag](edges: DataSet[Edge[K, EV]], mapper: 
MapFunction[K, VV],
+      env: ExecutionEnvironment): Graph[K, VV, EV] = {
     wrapGraph(jg.Graph.fromDataSet[K, VV, EV](edges.javaSet, mapper, 
env.getJavaEnv))
   }
 
@@ -87,8 +87,8 @@ object Graph {
   * map function to the vertex ids.
   */
   def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : 
ClassTag, EV:
-  TypeInformation : ClassTag](edges: Seq[Edge[K, EV]], env: 
ExecutionEnvironment,
-  mapper: MapFunction[K, VV]): Graph[K, VV, EV] = {
+  TypeInformation : ClassTag](edges: Seq[Edge[K, EV]], mapper: MapFunction[K, 
VV],
+      env: ExecutionEnvironment): Graph[K, VV, EV] = {
     wrapGraph(jg.Graph.fromCollection[K, VV, EV](edges.asJavaCollection, 
mapper, env.getJavaEnv))
   }
 
@@ -120,8 +120,8 @@ object Graph {
   * map function to the vertex ids.
   */
   def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : 
ClassTag, EV:
-  TypeInformation : ClassTag](edges: DataSet[(K, K, EV)], env: 
ExecutionEnvironment,
-  mapper: MapFunction[K, VV]): Graph[K, VV, EV] = {
+  TypeInformation : ClassTag](edges: DataSet[(K, K, EV)], mapper: 
MapFunction[K, VV],
+      env: ExecutionEnvironment): Graph[K, VV, EV] = {
     val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, 
v._3)).javaSet
     wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleEdges, mapper, 
env.getJavaEnv))
   }
@@ -230,7 +230,7 @@ object Graph {
 
       // initializer provided
       if (mapper != null) {
-        fromTupleDataSet[K, VV, EV](edges, env, mapper)
+        fromTupleDataSet[K, VV, EV](edges, mapper, env)
       }
       else {
         fromTupleDataSet[K, EV](edges, env) 
@@ -244,7 +244,7 @@ object Graph {
 
       // no initializer provided
       if (mapper != null) {
-        fromTupleDataSet[K, VV, NullValue](edges, env, mapper)
+        fromTupleDataSet[K, VV, NullValue](edges, mapper, env)
       }
       else {
         fromTupleDataSet[K, NullValue](edges, env) 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
 
b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
new file mode 100644
index 0000000..b3da520
--- /dev/null
+++ 
b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.example;
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.Edge
+import org.apache.flink.types.NullValue
+import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.library.GSAConnectedComponents
+import java.lang.Long
+
+/**
+ * This example shows how to use Gelly's library methods.
+ * You can find all available library methods in 
[[org.apache.flink.graph.library]]. 
+ * 
+ * In particular, this example uses the
+ * 
[[org.apache.flink.graph.library.ConnectedComponentsAlgorithm.GSAConnectedComponents]]
+ * library method to compute the connected components of the input graph.
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\n1\t3\n</code> defines two edges,
+ * 1-2 and 1-3.
+ *
+ * Usage {{
+ *   ConnectedComponents <edge path> <result path> <number of iterations>
+ *   }}
+ * If no parameters are provided, the program is run with default data from
+ * [[org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData]]
+ */
+object ConnectedComponents {
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val edges: DataSet[Edge[Long, NullValue]] = getEdgesDataSet(env)
+    val graph = Graph.fromDataSet[Long, Long, NullValue](edges, new 
InitVertices, env)
+
+    val components = graph.run(new GSAConnectedComponents[Long, 
NullValue](maxIterations))
+
+
+    // emit result
+    if (fileOutput) {
+      components.writeAsCsv(outputPath, "\n", ",")
+      env.execute("Connected Components Example")
+    } else {
+      components.print()
+    }
+  }
+
+  private final class InitVertices extends MapFunction[Long, Long] {
+    override def map(id: Long) = {id}
+  }
+
+  // ***********************************************************************
+  // UTIL METHODS
+  // ***********************************************************************
+
+    private var fileOutput = false
+    private var edgesInputPath: String = null
+    private var outputPath: String = null
+    private var maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS
+
+    private def parseParameters(args: Array[String]): Boolean = {
+      if(args.length > 0) {
+        if(args.length != 3) {
+          System.err.println("Usage ConnectedComponents <edge path> <output 
path> " +
+            "<num iterations>")
+          false
+        }
+        fileOutput = true
+        edgesInputPath = args(0)
+        outputPath = args(1)
+        maxIterations = (2).toInt
+      } else {
+        System.out.println("Executing ConnectedComponents example with default 
parameters" +
+          " and built-in default data.")
+        System.out.println("  Provide parameters to read input data from 
files.")
+        System.out.println("  See the documentation for the correct format of 
input files.")
+        System.out.println("Usage ConnectedComponents <edge path> <output 
path> " +
+          "<num iterations>");
+      }
+      true
+    }
+
+    private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, 
NullValue]] = {
+      if (fileOutput) {
+        env.readCsvFile[(Long, Long)](edgesInputPath,
+          lineDelimiter = "\n",
+          fieldDelimiter = "\t")
+          .map(edge => new Edge[Long, NullValue](edge._1, edge._2, 
NullValue.getInstance))
+      } else {
+        val edgeData = ConnectedComponentsDefaultData.DEFAULT_EDGES map {
+          case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long])
+        }
+        env.fromCollection(edgeData).map(
+        edge => new Edge[Long, NullValue](edge._1, edge._2, 
NullValue.getInstance))
+      }
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
 
b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
new file mode 100644
index 0000000..2dc272c
--- /dev/null
+++ 
b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.example;
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.types.NullValue
+import org.apache.flink.graph.Edge
+import org.apache.flink.api.common.functions.MapFunction
+import scala.collection.JavaConversions._
+import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
+import org.apache.flink.graph.gsa.GatherFunction
+import org.apache.flink.graph.gsa.Neighbor
+import org.apache.flink.graph.gsa.SumFunction
+import org.apache.flink.graph.gsa.ApplyFunction
+
+/**
+ * This example shows how to use Gelly's gather-sum-apply iterations.
+ * 
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm. 
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which 
are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
+ *
+ * If no parameters are provided, the program is run with default data from
+ * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]]
+ */
+object GSASingleSourceShortestPaths {
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
+    val graph = Graph.fromDataSet[Long, Double, Double](edges, new 
InitVertices(srcVertexId), env)
+
+    // Execute the gather-sum-apply iteration
+    val result = graph.runGatherSumApplyIteration(new CalculateDistances, new 
ChooseMinDistance,
+      new UpdateDistance, maxIterations)
+
+    // Extract the vertices as the result
+    val singleSourceShortestPaths = result.getVertices
+
+    // emit result
+    if (fileOutput) {
+      singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
+      env.execute("GSA Single Source Shortest Paths Example")
+    } else {
+      singleSourceShortestPaths.print()
+    }
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //  Single Source Shortest Path UDFs
+  // 
--------------------------------------------------------------------------------------------
+
+  private final class InitVertices(srcId: Long) extends MapFunction[Long, 
Double] {
+
+    override def map(id: Long) = {
+      if (id.equals(srcId)) {
+        0.0
+      } else {
+        Double.PositiveInfinity
+      }
+    }
+  }
+
+  private final class CalculateDistances extends GatherFunction[Double, 
Double, Double] {
+    override def gather(neighbor: Neighbor[Double, Double]) = {
+      neighbor.getNeighborValue + neighbor.getEdgeValue
+    }
+  }
+
+  private final class ChooseMinDistance extends SumFunction[Double, Double, 
Double] {
+    override def sum(newValue: Double, currentValue: Double) = {
+      Math.min(newValue, currentValue)
+    }
+  }
+
+  private final class UpdateDistance extends ApplyFunction[Long, Double, 
Double] {
+    override def apply(newDistance: Double, oldDistance: Double) = {
+      if (newDistance < oldDistance) {
+        setResult(newDistance)
+      }
+    }
+  }
+
+  // **************************************************************************
+  // UTIL METHODS
+  // **************************************************************************
+
+  private var fileOutput = false
+  private var srcVertexId = 1L
+  private var edgesInputPath: String = null
+  private var outputPath: String = null
+  private var maxIterations = 5
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if(args.length > 0) {
+      if(args.length != 4) {
+        System.err.println("Usage: SingleSourceShortestPaths <source vertex 
id>" +
+          " <input edges path> <output path> <num iterations>")
+        false
+      }
+      fileOutput = true
+      srcVertexId = args(0).toLong
+      edgesInputPath = args(1)
+      outputPath = args(2)
+      maxIterations = (3).toInt
+    } else {
+      System.out.println("Executing Single Source Shortest Paths example "
+        + "with default parameters and built-in default data.")
+      System.out.println("  Provide parameters to read input data from files.")
+      System.out.println("  See the documentation for the correct format of 
input files.")
+      System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" 
+
+        " <input edges path> <output path> <num iterations>");
+    }
+    true
+  }
+
+  private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, 
Double]] = {
+    if (fileOutput) {
+      env.readCsvFile[(Long, Long, Double)](edgesInputPath,
+        lineDelimiter = "\n",
+        fieldDelimiter = "\t")
+        .map(new Tuple3ToEdgeMap[Long, Double]())
+    } else {
+      val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map {
+        case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long],
+          z.asInstanceOf[Double])
+      }
+      env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]())
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
 
b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
index 68d9285..4eed824 100644
--- 
a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
+++ 
b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
@@ -106,21 +106,20 @@ object GraphMetrics {
   private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, 
NullValue]] = {
     if (fileOutput) {
       env.readCsvFile[(Long, Long)](
-          edgesPath,
-          fieldDelimiter = "\t").map(
-          in => new Edge[Long, NullValue](in._1, in._2, 
NullValue.getInstance()))
-    }
-    else {
+        edgesPath,
+        fieldDelimiter = "\t").map(
+        in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance()))
+    } else {
       env.generateSequence(1, numVertices).flatMap[Edge[Long, NullValue]](
-         (key: Long, out: Collector[Edge[Long, NullValue]]) => {
-         val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt
+        (key: Long, out: Collector[Edge[Long, NullValue]]) => {
+          val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt
           for ( i <- 0 to numOutEdges ) {
             var target: Long = ((Math.random() * numVertices) + 1).toLong
-              new Edge[Long, NullValue](key, target, NullValue.getInstance())
+            new Edge[Long, NullValue](key, target, NullValue.getInstance())
           }
-        })
-      }
+      })
     }
+  }
 
   private var fileOutput: Boolean = false
   private var edgesPath: String = null

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
 
b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
new file mode 100644
index 0000000..65a8e7f
--- /dev/null
+++ 
b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.example;
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.types.NullValue
+import org.apache.flink.graph.Edge
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.spargel.VertexUpdateFunction
+import org.apache.flink.graph.spargel.MessageIterator
+import org.apache.flink.graph.Vertex
+import org.apache.flink.graph.spargel.MessagingFunction
+import scala.collection.JavaConversions._
+import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
+
+/**
+ * This example shows how to use Gelly's vertex-centric iterations.
+ * 
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm. 
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which 
are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
+ *
+ * If no parameters are provided, the program is run with default data from
+ * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]]
+ */
+object SingleSourceShortestPaths {
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
+    val graph = Graph.fromDataSet[Long, Double, Double](edges, new 
InitVertices(srcVertexId), env)
+
+    // Execute the vertex-centric iteration
+    val result = graph.runVertexCentricIteration(new VertexDistanceUpdater,
+      new MinDistanceMessenger, maxIterations)
+
+    // Extract the vertices as the result
+    val singleSourceShortestPaths = result.getVertices
+
+    // emit result
+    if (fileOutput) {
+      singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
+      env.execute("Single Source Shortest Paths Example")
+    } else {
+      singleSourceShortestPaths.print()
+    }
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //  Single Source Shortest Path UDFs
+  // 
--------------------------------------------------------------------------------------------
+
+  private final class InitVertices(srcId: Long) extends MapFunction[Long, 
Double] {
+
+    override def map(id: Long) = {
+      if (id.equals(srcId)) {
+        0.0
+      } else {
+        Double.PositiveInfinity
+      }
+    }
+  }
+
+  /**
+   * Function that updates the value of a vertex by picking the minimum
+   * distance from all incoming messages.
+   */
+  private final class VertexDistanceUpdater extends VertexUpdateFunction[Long, 
Double, Double] {
+
+    override def updateVertex(vertex: Vertex[Long, Double], inMessages: 
MessageIterator[Double]) {
+      var minDistance = Double.MaxValue
+      while (inMessages.hasNext) {
+        var msg = inMessages.next
+        if (msg < minDistance) {
+          minDistance = msg
+        }
+      }
+      if (vertex.getValue > minDistance) {
+        setNewVertexValue(minDistance)
+      }
+    }
+  }
+
+  /**
+   * Distributes the minimum distance associated with a given vertex among all
+   * the target vertices summed up with the edge's value.
+   */
+  private final class MinDistanceMessenger extends
+    MessagingFunction[Long, Double, Double, Double] {
+
+    override def sendMessages(vertex: Vertex[Long, Double]) {
+      for (edge: Edge[Long, Double] <- getEdges) {
+        sendMessageTo(edge.getTarget(), vertex.getValue + edge.getValue)
+      }
+    }
+  }
+
+  // 
****************************************************************************
+  // UTIL METHODS
+  // 
****************************************************************************
+
+  private var fileOutput = false
+  private var srcVertexId = 1L
+  private var edgesInputPath: String = null
+  private var outputPath: String = null
+  private var maxIterations = 5
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if(args.length > 0) {
+      if(args.length != 4) {
+        System.err.println("Usage: SingleSourceShortestPaths <source vertex 
id>" +
+          " <input edges path> <output path> <num iterations>")
+        false
+      }
+      fileOutput = true
+      srcVertexId = args(0).toLong
+      edgesInputPath = args(1)
+      outputPath = args(2)
+      maxIterations = (3).toInt
+    } else {
+      System.out.println("Executing Single Source Shortest Paths example "
+        + "with default parameters and built-in default data.")
+      System.out.println("  Provide parameters to read input data from files.")
+      System.out.println("  See the documentation for the correct format of 
input files.")
+      System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" 
+
+        " <input edges path> <output path> <num iterations>");
+    }
+    true
+  }
+
+  private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, 
Double]] = {
+    if (fileOutput) {
+      env.readCsvFile[(Long, Long, Double)](edgesInputPath,
+        lineDelimiter = "\n",
+        fieldDelimiter = "\t")
+        .map(new Tuple3ToEdgeMap[Long, Double]())
+    } else {
+      val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map {
+        case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long],
+          z.asInstanceOf[Double])
+      }
+      env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]())
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
index 4189602..cd52e04 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
@@ -34,7 +34,7 @@ import org.apache.flink.types.NullValue;
  * This example shows how to use Gelly's library methods.
  * You can find all available library methods in {@link 
org.apache.flink.graph.library}. 
  * 
- * In particular, this example uses the {@link 
org.apache.flink.graph.library.ConnectedComponentsAlgorithm}
+ * In particular, this example uses the {@link 
org.apache.flink.graph.library.GSAConnectedComponents}
  * library method to compute the connected components of the input graph.
  *
  * The input file is a plain text file and must be formatted as follows:

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java
index b9556a9..67864eb 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.types.NullValue;
 
-import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 
 /**
@@ -36,14 +36,19 @@ public class ConnectedComponentsDefaultData {
 
        public static final String EDGES = "1   2\n" + "2       3\n" + "2       
4\n" + "3       4";
 
-       public static DataSet<Edge<Long, NullValue>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
-               List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, 
NullValue>>();
-               edges.add(new Edge<Long, NullValue>(1L, 2L, 
NullValue.getInstance()));
-               edges.add(new Edge<Long, NullValue>(2L, 3L, 
NullValue.getInstance()));
-               edges.add(new Edge<Long, NullValue>(2L, 4L, 
NullValue.getInstance()));
-               edges.add(new Edge<Long, NullValue>(3L, 4L, 
NullValue.getInstance()));
+       public static final Object[][] DEFAULT_EDGES = new Object[][] {
+               new Object[]{1L, 2L},
+               new Object[]{2L, 3L},
+               new Object[]{2L, 4L},
+               new Object[]{3L, 4L}
+       };
 
-               return env.fromCollection(edges);
+       public static DataSet<Edge<Long, NullValue>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
+               List<Edge<Long, NullValue>> edgeList = new 
LinkedList<Edge<Long, NullValue>>();
+               for (Object[] edge : DEFAULT_EDGES) {
+                       edgeList.add(new Edge<Long, NullValue>((Long) edge[0], 
(Long) edge[1], NullValue.getInstance()));
+               }
+               return env.fromCollection(edgeList);
        }
 
        public static final String VERTICES_WITH_MIN_ID = "1,1\n" + "2,1\n" + 
"3,1\n" + "4,1";

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
index cf0034a..6b985c5 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.graph.example.utils;
 
+import java.util.LinkedList;
+import java.util.List;
+
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Edge;
 
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * Provides the default data set used for the Single Source Shortest Paths 
example program.
  * If no parameters are given to the program, the default edge data set is 
used.
@@ -36,22 +36,27 @@ public class SingleSourceShortestPathsData {
        public static final String EDGES = "1\t2\t12.0\n" + "1\t3\t13.0\n" + 
"2\t3\t23.0\n" + "3\t4\t34.0\n" + "3\t5\t35.0\n" +
                                        "4\t5\t45.0\n" + "5\t1\t51.0";
 
-       public static final DataSet<Edge<Long, Double>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
-
-               List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, 
Double>>();
-               edges.add(new Edge<Long, Double>(1L, 2L, 12.0));
-               edges.add(new Edge<Long, Double>(1L, 3L, 13.0));
-               edges.add(new Edge<Long, Double>(2L, 3L, 23.0));
-               edges.add(new Edge<Long, Double>(3L, 4L, 34.0));
-               edges.add(new Edge<Long, Double>(3L, 5L, 35.0));
-               edges.add(new Edge<Long, Double>(4L, 5L, 45.0));
-               edges.add(new Edge<Long, Double>(5L, 1L, 51.0));
-
-               return env.fromCollection(edges);
-       }
+       public static final Object[][] DEFAULT_EDGES = new Object[][] {
+               new Object[]{1L, 2L, 12.0},
+               new Object[]{1L, 3L, 13.0},
+               new Object[]{2L, 3L, 23.0},
+               new Object[]{3L, 4L, 34.0},
+               new Object[]{3L, 5L, 35.0},
+               new Object[]{4L, 5L, 45.0},
+               new Object[]{5L, 1L, 51.0}
+       };
 
        public static final String RESULTED_SINGLE_SOURCE_SHORTEST_PATHS =  
"1,0.0\n" + "2,12.0\n" + "3,13.0\n" + 
                                                                "4,47.0\n" + 
"5,48.0";
 
+       public static DataSet<Edge<Long, Double>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
+               
+               List<Edge<Long, Double>> edgeList = new LinkedList<Edge<Long, 
Double>>();
+               for (Object[] edge : DEFAULT_EDGES) {
+                       edgeList.add(new Edge<Long, Double>((Long) edge[0], 
(Long) edge[1], (Double) edge[2]));
+               }
+               return env.fromCollection(edgeList);
+       }
+
        private SingleSourceShortestPathsData() {}
 }

Reply via email to