[FLINK-3511] [gelly] Introduce flink-gelly-examples module

The new flink-gelly-examples module contains all Java and Scala Gelly examples. 
The module
contains compile scope dependencies on flink-java, flink-scala and 
flink-clients so that
the examples can be conveniently run from within the IDE.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c605d27
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c605d27
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c605d27

Branch: refs/heads/release-1.0
Commit: 2c605d275b26793d8676e35b6ccc5102bdcbf30d
Parents: c0bc8bc
Author: Till Rohrmann <[email protected]>
Authored: Fri Feb 26 14:08:02 2016 +0100
Committer: Robert Metzger <[email protected]>
Committed: Fri Feb 26 20:57:04 2016 +0100

----------------------------------------------------------------------
 flink-libraries/flink-gelly-examples/pom.xml    | 206 +++++++++++++
 .../graph/examples/ConnectedComponents.java     | 141 +++++++++
 .../graph/examples/EuclideanGraphWeighing.java  | 214 +++++++++++++
 .../examples/GSASingleSourceShortestPaths.java  | 191 ++++++++++++
 .../flink/graph/examples/GraphMetrics.java      | 170 +++++++++++
 .../flink/graph/examples/IncrementalSSSP.java   | 266 ++++++++++++++++
 .../examples/JaccardSimilarityMeasure.java      | 214 +++++++++++++
 .../flink/graph/examples/MusicProfiles.java     | 303 +++++++++++++++++++
 .../examples/SingleSourceShortestPaths.java     | 201 ++++++++++++
 .../examples/data/CommunityDetectionData.java   |  95 ++++++
 .../data/ConnectedComponentsDefaultData.java    |  57 ++++
 .../graph/examples/data/EuclideanGraphData.java |  86 ++++++
 .../examples/data/IncrementalSSSPData.java      |  95 ++++++
 .../data/JaccardSimilarityMeasureData.java      |  58 ++++
 .../examples/data/LabelPropagationData.java     | 114 +++++++
 .../graph/examples/data/MusicProfilesData.java  | 108 +++++++
 .../flink/graph/examples/data/PageRankData.java |  69 +++++
 .../data/SingleSourceShortestPathsData.java     |  62 ++++
 .../graph/examples/data/SummarizationData.java  | 134 ++++++++
 .../graph/examples/data/TriangleCountData.java  |  65 ++++
 .../graph/examples/utils/ExampleUtils.java      | 162 ++++++++++
 .../scala/examples/ConnectedComponents.scala    | 120 ++++++++
 .../examples/GSASingleSourceShortestPaths.scala | 150 +++++++++
 .../graph/scala/examples/GraphMetrics.scala     | 129 ++++++++
 .../examples/SingleSourceShortestPaths.scala    | 170 +++++++++++
 .../graph/library/CommunityDetectionITCase.java |  81 +++++
 .../graph/library/LabelPropagationITCase.java   |  79 +++++
 .../flink/graph/library/PageRankITCase.java     | 128 ++++++++
 .../graph/library/SummarizationITCase.java      | 188 ++++++++++++
 .../graph/library/TriangleCountITCase.java      |  53 ++++
 .../graph/library/TriangleEnumeratorITCase.java |  57 ++++
 .../flink/graph/test/GatherSumApplyITCase.java  | 106 +++++++
 .../examples/ConnectedComponentsITCase.java     |  72 +++++
 .../examples/EuclideanGraphWeighingITCase.java  |  78 +++++
 .../test/examples/IncrementalSSSPITCase.java    | 134 ++++++++
 .../JaccardSimilarityMeasureITCase.java         |  73 +++++
 .../test/examples/MusicProfilesITCase.java      | 102 +++++++
 .../SingleSourceShortestPathsITCase.java        |  82 +++++
 .../scala/example/ConnectedComponents.scala     | 120 --------
 .../example/GSASingleSourceShortestPaths.scala  | 150 ---------
 .../graph/scala/example/GraphMetrics.scala      | 128 --------
 .../example/SingleSourceShortestPaths.scala     | 170 -----------
 .../graph/example/ConnectedComponents.java      | 141 ---------
 .../graph/example/EuclideanGraphWeighing.java   | 214 -------------
 .../example/GSASingleSourceShortestPaths.java   | 191 ------------
 .../flink/graph/example/GraphMetrics.java       | 170 -----------
 .../flink/graph/example/IncrementalSSSP.java    | 266 ----------------
 .../graph/example/JaccardSimilarityMeasure.java | 214 -------------
 .../flink/graph/example/MusicProfiles.java      | 303 -------------------
 .../example/SingleSourceShortestPaths.java      | 201 ------------
 .../example/utils/CommunityDetectionData.java   |  95 ------
 .../utils/ConnectedComponentsDefaultData.java   |  57 ----
 .../graph/example/utils/EuclideanGraphData.java |  86 ------
 .../flink/graph/example/utils/ExampleUtils.java | 162 ----------
 .../example/utils/IncrementalSSSPData.java      |  95 ------
 .../utils/JaccardSimilarityMeasureData.java     |  58 ----
 .../example/utils/LabelPropagationData.java     | 114 -------
 .../graph/example/utils/MusicProfilesData.java  | 108 -------
 .../flink/graph/example/utils/PageRankData.java |  69 -----
 .../utils/SingleSourceShortestPathsData.java    |  62 ----
 .../graph/example/utils/SummarizationData.java  | 134 --------
 .../graph/example/utils/TriangleCountData.java  |  65 ----
 .../graph/library/ConnectedComponents.java      |   2 +-
 .../graph/library/GSAConnectedComponents.java   |   2 +-
 .../library/SingleSourceShortestPaths.java      |   2 +-
 .../flink/graph/library/TriangleEnumerator.java |   4 +-
 ...ctedComponentsWithRandomisedEdgesITCase.java |  93 ++++++
 .../flink/graph/test/GatherSumApplyITCase.java  | 106 -------
 .../test/example/ConnectedComponentsITCase.java |  71 -----
 .../example/EuclideanGraphWeighingITCase.java   |  77 -----
 .../test/example/IncrementalSSSPITCase.java     | 133 --------
 .../example/JaccardSimilarityMeasureITCase.java |  72 -----
 .../graph/test/example/MusicProfilesITCase.java | 101 -------
 .../SingleSourceShortestPathsITCase.java        |  81 -----
 .../test/library/CommunityDetectionITCase.java  |  82 -----
 ...ctedComponentsWithRandomisedEdgesITCase.java |  94 ------
 .../test/library/LabelPropagationITCase.java    |  80 -----
 .../graph/test/library/PageRankITCase.java      | 130 --------
 .../graph/test/library/SummarizationITCase.java | 189 ------------
 .../graph/test/library/TriangleCountITCase.java |  54 ----
 .../test/library/TriangleEnumeratorITCase.java  |  58 ----
 flink-libraries/pom.xml                         |   1 +
 82 files changed, 4912 insertions(+), 4706 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/pom.xml 
b/flink-libraries/flink-gelly-examples/pom.xml
new file mode 100644
index 0000000..2b84cc1
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/pom.xml
@@ -0,0 +1,206 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-libraries</artifactId>
+               <version>1.0-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-gelly_examples_2.10</artifactId>
+       <name>flink-gelly-examples</name>
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-java</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-clients_2.10</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-scala_2.10</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-gelly_2.10</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-gelly-scala_2.10</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>com.google.guava</groupId>
+                       <artifactId>guava</artifactId>
+                       <version>${guava.version}</version>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <!-- Scala Compiler -->
+                       <plugin>
+                               <groupId>net.alchim31.maven</groupId>
+                               <artifactId>scala-maven-plugin</artifactId>
+                               <version>3.1.4</version>
+                               <executions>
+                                       <!-- Run scala compiler in the 
process-resources phase, so that dependencies on
+                        scala classes can be resolved later in the (Java) 
compile phase -->
+                                       <execution>
+                                               <id>scala-compile-first</id>
+                                               <phase>process-resources</phase>
+                                               <goals>
+                                                       <goal>compile</goal>
+                                               </goals>
+                                       </execution>
+
+                                       <!-- Run scala compiler in the 
process-test-resources phase, so that dependencies on
+                         scala classes can be resolved later in the (Java) 
test-compile phase -->
+                                       <execution>
+                                               <id>scala-test-compile</id>
+                                               
<phase>process-test-resources</phase>
+                                               <goals>
+                                                       <goal>testCompile</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                               <configuration>
+                                       <jvmArgs>
+                                               <jvmArg>-Xms128m</jvmArg>
+                                               <jvmArg>-Xmx512m</jvmArg>
+                                       </jvmArgs>
+                                       <compilerPlugins 
combine.children="append">
+                                               <compilerPlugin>
+                                                       
<groupId>org.scalamacros</groupId>
+                                                       
<artifactId>paradise_${scala.version}</artifactId>
+                                                       
<version>${scala.macros.version}</version>
+                                               </compilerPlugin>
+                                       </compilerPlugins>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Eclipse Integration -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-eclipse-plugin</artifactId>
+                               <version>2.8</version>
+                               <configuration>
+                                       <downloadSources>true</downloadSources>
+                                       <projectnatures>
+                                               
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+                                               
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+                                       </projectnatures>
+                                       <buildcommands>
+                                               
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+                                       </buildcommands>
+                                       <classpathContainers>
+                                               
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+                                               
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+                                       </classpathContainers>
+                                       <excludes>
+                                               
<exclude>org.scala-lang:scala-library</exclude>
+                                               
<exclude>org.scala-lang:scala-compiler</exclude>
+                                       </excludes>
+                                       <sourceIncludes>
+                                               
<sourceInclude>**/*.scala</sourceInclude>
+                                               
<sourceInclude>**/*.java</sourceInclude>
+                                       </sourceIncludes>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Adding scala source directories to build path -->
+                       <plugin>
+                               <groupId>org.codehaus.mojo</groupId>
+                               
<artifactId>build-helper-maven-plugin</artifactId>
+                               <version>1.7</version>
+                               <executions>
+                                       <!-- Add src/main/scala to eclipse 
build path -->
+                                       <execution>
+                                               <id>add-source</id>
+                                               <phase>generate-sources</phase>
+                                               <goals>
+                                                       <goal>add-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>src/main/scala</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                                       <!-- Add src/test/scala to eclipse 
build path -->
+                                       <execution>
+                                               <id>add-test-source</id>
+                                               
<phase>generate-test-sources</phase>
+                                               <goals>
+                                                       
<goal>add-test-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>src/test/scala</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <plugin>
+                               <groupId>org.scalastyle</groupId>
+                               <artifactId>scalastyle-maven-plugin</artifactId>
+                               <version>0.5.0</version>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>check</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                               <configuration>
+                                       <verbose>false</verbose>
+                                       <failOnViolation>true</failOnViolation>
+                                       
<includeTestSourceDirectory>true</includeTestSourceDirectory>
+                                       <failOnWarning>false</failOnWarning>
+                                       
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+                                       
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+                                       
<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+                                       
<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+                                       <outputEncoding>UTF-8</outputEncoding>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java
new file mode 100644
index 0000000..93c801f
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData;
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.library.GSAConnectedComponents;
+import org.apache.flink.types.NullValue;
+
+/**
+ * This example shows how to use Gelly's library methods.
+ * You can find all available library methods in {@link 
org.apache.flink.graph.library}. 
+ * 
+ * In particular, this example uses the {@link GSAConnectedComponents}
+ * library method to compute the connected components of the input graph.
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\n1\t3\n</code> defines two edges,
+ * 1-2 with and 1-3.
+ *
+ * Usage <code>ConnectedComponents &lt;edge path&gt; &lt;result path&gt;
+ * &lt;number of iterations&gt; </code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link ConnectedComponentsDefaultData}
+ */
+public class ConnectedComponents implements ProgramDescription {
+
+       @SuppressWarnings("serial")
+       public static void main(String [] args) throws Exception {
+
+               if(!parseParameters(args)) {
+                       return;
+               }
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env);
+
+               Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, 
new MapFunction<Long, Long>() {
+                       @Override
+                       public Long map(Long value) throws Exception {
+                               return value;
+                       }
+               }, env);
+
+               DataSet<Vertex<Long, Long>> verticesWithMinIds = graph
+                               .run(new GSAConnectedComponents<Long, 
NullValue>(maxIterations));
+
+               // emit result
+               if (fileOutput) {
+                       verticesWithMinIds.writeAsCsv(outputPath, "\n", ",");
+
+                       // since file sinks are lazy, we trigger the execution 
explicitly
+                       env.execute("Connected Components Example");
+               } else {
+                       verticesWithMinIds.print();
+               }
+       }
+
+       @Override
+       public String getDescription() {
+               return "Connected Components Example";
+       }
+
+       // 
*************************************************************************
+       // UTIL METHODS
+       // 
*************************************************************************
+
+       private static boolean fileOutput = false;
+       private static String edgeInputPath = null;
+       private static String outputPath = null;
+       private static Integer maxIterations = 
ConnectedComponentsDefaultData.MAX_ITERATIONS;
+
+       private static boolean parseParameters(String [] args) {
+               if(args.length > 0) {
+                       if(args.length != 3) {
+                               System.err.println("Usage ConnectedComponents 
<edge path> <output path> " +
+                                               "<num iterations>");
+                               return false;
+                       }
+
+                       fileOutput = true;
+                       edgeInputPath = args[0];
+                       outputPath = args[1];
+                       maxIterations = Integer.parseInt(args[2]);
+
+               } else {
+                       System.out.println("Executing ConnectedComponents 
example with default parameters and built-in default data.");
+                       System.out.println("Provide parameters to read input 
data from files.");
+                       System.out.println("Usage ConnectedComponents <edge 
path> <output path> " +
+                                       "<num iterations>");
+               }
+
+               return true;
+       }
+
+       @SuppressWarnings("serial")
+       private static DataSet<Edge<Long, NullValue>> 
getEdgesDataSet(ExecutionEnvironment env) {
+
+               if(fileOutput) {
+                       return env.readCsvFile(edgeInputPath)
+                                       .ignoreComments("#")
+                                       .fieldDelimiter("\t")
+                                       .lineDelimiter("\n")
+                                       .types(Long.class, Long.class)
+                                       .map(new MapFunction<Tuple2<Long, 
Long>, Edge<Long, NullValue>>() {
+                                               @Override
+                                               public Edge<Long, NullValue> 
map(Tuple2<Long, Long> value) throws Exception {
+                                                       return new Edge<Long, 
NullValue>(value.f0, value.f1, NullValue.getInstance());
+                                               }
+                                       });
+               } else {
+                       return 
ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/EuclideanGraphWeighing.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/EuclideanGraphWeighing.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/EuclideanGraphWeighing.java
new file mode 100644
index 0000000..bd6111d
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/EuclideanGraphWeighing.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.flink.graph.examples.data.EuclideanGraphData;
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeJoinFunction;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Triplet;
+import org.apache.flink.graph.Vertex;
+
+import java.io.Serializable;
+
+/**
+ * This example shows how to use Gelly's {@link Graph#getTriplets()} and
+ * {@link Graph#joinWithEdges(DataSet, EdgeJoinFunction)} methods.
+ * 
+ * Given a directed, unweighted graph, with vertex values representing points 
in a plan,
+ * return a weighted graph where the edge weights are equal to the Euclidean 
distance between the
+ * src and the trg vertex values.
+ *
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ *     <li> Vertices are represented by their vertexIds and vertex values and 
are separated by newlines,
+ *     the value being formed of two doubles separated by a comma.
+ *     For example: <code>1,1.0,1.0\n2,2.0,2.0\n3,3.0,3.0\n</code> defines a 
data set of three vertices
+ *     <li> Edges are represented by pairs of srcVertexId, trgVertexId 
separated by commas.
+ *     Edges themselves are separated by newlines.
+ *     For example: <code>1,2\n1,3\n</code> defines two edges 1-2 and 1-3.
+ * </ul>
+ *
+ * Usage <code>EuclideanGraphWeighing &lt;vertex path&gt; &lt;edge path&gt; 
&lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link EuclideanGraphData}
+ */
+@SuppressWarnings("serial")
+public class EuclideanGraphWeighing implements ProgramDescription {
+
+       public static void main(String[] args) throws Exception {
+
+               if (!parseParameters(args)) {
+                       return;
+               }
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Vertex<Long, Point>> vertices = getVerticesDataSet(env);
+
+               DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+               Graph<Long, Point, Double> graph = Graph.fromDataSet(vertices, 
edges, env);
+
+               // the edge value will be the Euclidean distance between its 
src and trg vertex
+               DataSet<Tuple3<Long, Long, Double>> edgesWithEuclideanWeight = 
graph.getTriplets()
+                               .map(new MapFunction<Triplet<Long, Point, 
Double>, Tuple3<Long, Long, Double>>() {
+
+                                       @Override
+                                       public Tuple3<Long, Long, Double> 
map(Triplet<Long, Point, Double> triplet)
+                                                       throws Exception {
+
+                                               Vertex<Long, Point> srcVertex = 
triplet.getSrcVertex();
+                                               Vertex<Long, Point> trgVertex = 
triplet.getTrgVertex();
+
+                                               return new Tuple3<Long, Long, 
Double>(srcVertex.getId(), trgVertex.getId(),
+                                                               
srcVertex.getValue().euclideanDistance(trgVertex.getValue()));
+                                       }
+                               });
+
+               Graph<Long, Point, Double> resultedGraph = 
graph.joinWithEdges(edgesWithEuclideanWeight,
+                               new EdgeJoinFunction<Double, Double>() {
+
+                                       public Double edgeJoin(Double 
edgeValue, Double inputValue) {
+                                               return inputValue;
+                                       }
+                               });
+
+               // retrieve the edges from the final result
+               DataSet<Edge<Long, Double>> result = resultedGraph.getEdges();
+
+               // emit result
+               if (fileOutput) {
+                       result.writeAsCsv(outputPath, "\n", ",");
+
+                       // since file sinks are lazy, we trigger the execution 
explicitly
+                       env.execute("Euclidean Graph Weighing Example");
+               } else {
+                       result.print();
+               }
+
+       }
+
+       @Override
+       public String getDescription() {
+               return "Weighing a graph by computing the Euclidean distance " +
+                               "between its vertices";
+       }
+
+       // 
*************************************************************************
+       //     DATA TYPES
+       // 
*************************************************************************
+
+       /**
+        * A simple two-dimensional point.
+        */
+       public static class Point implements Serializable {
+
+               public double x, y;
+
+               public Point() {}
+
+               public Point(double x, double y) {
+                       this.x = x;
+                       this.y = y;
+               }
+
+               public double euclideanDistance(Point other) {
+                       return Math.sqrt((x-other.x)*(x-other.x) + 
(y-other.y)*(y-other.y));
+               }
+
+               @Override
+               public String toString() {
+                       return x + " " + y;
+               }
+       }
+
+       // 
******************************************************************************************************************
+       // UTIL METHODS
+       // 
******************************************************************************************************************
+
+       private static boolean fileOutput = false;
+
+       private static String verticesInputPath = null;
+
+       private static String edgesInputPath = null;
+
+       private static String outputPath = null;
+
+       private static boolean parseParameters(String[] args) {
+
+               if (args.length > 0) {
+                       if (args.length == 3) {
+                               fileOutput = true;
+                               verticesInputPath = args[0];
+                               edgesInputPath = args[1];
+                               outputPath = args[2];
+                       } else {
+                               System.out.println("Executing Euclidean Graph 
Weighing example with default parameters and built-in default data.");
+                               System.out.println("Provide parameters to read 
input data from files.");
+                               System.out.println("See the documentation for 
the correct format of input files.");
+                               System.err.println("Usage: 
EuclideanGraphWeighing <input vertices path> <input edges path>" +
+                                               " <output path>");
+                               return false;
+                       }
+               }
+               return true;
+       }
+
+       private static DataSet<Vertex<Long, Point>> 
getVerticesDataSet(ExecutionEnvironment env) {
+               if (fileOutput) {
+                       return env.readCsvFile(verticesInputPath)
+                                       .lineDelimiter("\n")
+                                       .types(Long.class, Double.class, 
Double.class)
+                                       .map(new MapFunction<Tuple3<Long, 
Double, Double>, Vertex<Long, Point>>() {
+
+                                               @Override
+                                               public Vertex<Long, Point> 
map(Tuple3<Long, Double, Double> value) throws Exception {
+                                                       return new Vertex<Long, 
Point>(value.f0, new Point(value.f1, value.f2));
+                                               }
+                                       });
+               } else {
+                       return EuclideanGraphData.getDefaultVertexDataSet(env);
+               }
+       }
+
+       private static DataSet<Edge<Long, Double>> 
getEdgesDataSet(ExecutionEnvironment env) {
+               if (fileOutput) {
+                       return env.readCsvFile(edgesInputPath)
+                                       .lineDelimiter("\n")
+                                       .types(Long.class, Long.class)
+                                       .map(new MapFunction<Tuple2<Long, 
Long>, Edge<Long, Double>>() {
+
+                                               @Override
+                                               public Edge<Long, Double> 
map(Tuple2<Long, Long> tuple2) throws Exception {
+                                                       return new Edge<Long, 
Double>(tuple2.f0, tuple2.f1, 0.0);
+                                               }
+                                       });
+               } else {
+                       return EuclideanGraphData.getDefaultEdgeDataSet(env);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
new file mode 100755
index 0000000..1732016
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData;
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+
+/**
+ * This example shows how to use Gelly's Gather-Sum-Apply iterations.
+ * 
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm.
+ * For a vertex-centric implementation of the same algorithm, please refer to 
{@link SingleSourceShortestPaths}. 
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which 
are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
+ *
+ * If no parameters are provided, the program is run with default data from
+ * {@link SingleSourceShortestPathsData}
+ */
+public class GSASingleSourceShortestPaths implements ProgramDescription {
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Program
+       // 
--------------------------------------------------------------------------------------------
+
+       public static void main(String[] args) throws Exception {
+
+               if(!parseParameters(args)) {
+                       return;
+               }
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
+
+               Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, 
new InitVertices(srcVertexId), env);
+
+               // Execute the GSA iteration
+               Graph<Long, Double, Double> result = 
graph.runGatherSumApplyIteration(
+                               new CalculateDistances(), new 
ChooseMinDistance(), new UpdateDistance(), maxIterations);
+
+               // Extract the vertices as the result
+               DataSet<Vertex<Long, Double>> singleSourceShortestPaths = 
result.getVertices();
+
+               // emit result
+               if(fileOutput) {
+                       singleSourceShortestPaths.writeAsCsv(outputPath, "\n", 
",");
+
+                       // since file sinks are lazy, we trigger the execution 
explicitly
+                       env.execute("GSA Single Source Shortest Paths");
+               } else {
+                       singleSourceShortestPaths.print();
+               }
+
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Single Source Shortest Path UDFs
+       // 
--------------------------------------------------------------------------------------------
+
+       @SuppressWarnings("serial")
+       private static final class InitVertices implements MapFunction<Long, 
Double>{
+
+               private long srcId;
+
+               public InitVertices(long srcId) {
+                       this.srcId = srcId;
+               }
+
+               public Double map(Long id) {
+                       if (id.equals(srcId)) {
+                               return 0.0;
+                       }
+                       else {
+                               return Double.POSITIVE_INFINITY;
+                       }
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class CalculateDistances extends 
GatherFunction<Double, Double, Double> {
+
+               public Double gather(Neighbor<Double, Double> neighbor) {
+                       return neighbor.getNeighborValue() + 
neighbor.getEdgeValue();
+               }
+       };
+
+       @SuppressWarnings("serial")
+       private static final class ChooseMinDistance extends 
SumFunction<Double, Double, Double> {
+
+               public Double sum(Double newValue, Double currentValue) {
+                       return Math.min(newValue, currentValue);
+               }
+       };
+
+       @SuppressWarnings("serial")
+       private static final class UpdateDistance extends ApplyFunction<Long, 
Double, Double> {
+
+               public void apply(Double newDistance, Double oldDistance) {
+                       if (newDistance < oldDistance) {
+                               setResult(newDistance);
+                       }
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Util methods
+       // 
--------------------------------------------------------------------------------------------
+
+       private static boolean fileOutput = false;
+
+       private static Long srcVertexId = 1l;
+
+       private static String edgesInputPath = null;
+
+       private static String outputPath = null;
+
+       private static int maxIterations = 5;
+
+       private static boolean parseParameters(String[] args) {
+
+               if (args.length > 0) {
+                       if(args.length != 4) {
+                               System.err.println("Usage: 
GSASingleSourceShortestPaths <source vertex id>" +
+                                               " <input edges path> <output 
path> <num iterations>");
+                               return false;
+                       }
+
+                       fileOutput = true;
+                       srcVertexId = Long.parseLong(args[0]);
+                       edgesInputPath = args[1];
+                       outputPath = args[2];
+                       maxIterations = Integer.parseInt(args[3]);
+               } else {
+                               System.out.println("Executing GSASingle Source 
Shortest Paths example "
+                                               + "with default parameters and 
built-in default data.");
+                               System.out.println("  Provide parameters to 
read input data from files.");
+                               System.out.println("  See the documentation for 
the correct format of input files.");
+                               System.out.println("Usage: 
GSASingleSourceShortestPaths <source vertex id>" +
+                                               " <input edges path> <output 
path> <num iterations>");
+               }
+               return true;
+       }
+
+       private static DataSet<Edge<Long, Double>> 
getEdgeDataSet(ExecutionEnvironment env) {
+               if (fileOutput) {
+                       return env.readCsvFile(edgesInputPath)
+                                       .fieldDelimiter("\t")
+                                       .lineDelimiter("\n")
+                                       .types(Long.class, Long.class, 
Double.class)
+                                       .map(new Tuple3ToEdgeMap<Long, 
Double>());
+               } else {
+                       return 
SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
+               }
+       }
+
+       @Override
+       public String getDescription() {
+               return "GSA Single Source Shortest Paths";
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java
new file mode 100644
index 0000000..9058538
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.flink.graph.examples.utils.ExampleUtils;
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.types.NullValue;
+
+/**
+ * This example illustrates how to use Gelly metrics methods and get simple 
statistics
+ * from the input graph.  
+ * 
+ * The program creates a random graph and computes and prints
+ * the following metrics:
+ * - number of vertices
+ * - number of edges
+ * - average node degree
+ * - the vertex ids with the max/min in- and out-degrees
+ *
+ * The input file is expected to contain one edge per line,
+ * with long IDs and no values, in the following format:
+ * "&lt;sourceVertexID&gt;\t&lt;targetVertexID&gt;".
+ * If no arguments are provided, the example runs with a random graph of 100 
vertices.
+ *
+ */
+public class GraphMetrics implements ProgramDescription {
+
+       public static void main(String[] args) throws Exception {
+
+               if (!parseParameters(args)) {
+                       return;
+               }
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               /** create the graph **/
+               Graph<Long, NullValue, NullValue> graph = 
Graph.fromDataSet(getEdgesDataSet(env), env);
+               
+               /** get the number of vertices **/
+               long numVertices = graph.numberOfVertices();
+               
+               /** get the number of edges **/
+               long numEdges = graph.numberOfEdges();
+               
+               /** compute the average node degree **/
+               DataSet<Tuple2<Long, Long>> verticesWithDegrees = 
graph.getDegrees();
+
+               DataSet<Double> avgNodeDegree = verticesWithDegrees
+                               .aggregate(Aggregations.SUM, 1).map(new 
AvgNodeDegreeMapper(numVertices));
+               
+               /** find the vertex with the maximum in-degree **/
+               DataSet<Long> maxInDegreeVertex = 
graph.inDegrees().maxBy(1).map(new ProjectVertexId());
+
+               /** find the vertex with the minimum in-degree **/
+               DataSet<Long> minInDegreeVertex = 
graph.inDegrees().minBy(1).map(new ProjectVertexId());
+
+               /** find the vertex with the maximum out-degree **/
+               DataSet<Long> maxOutDegreeVertex = 
graph.outDegrees().maxBy(1).map(new ProjectVertexId());
+
+               /** find the vertex with the minimum out-degree **/
+               DataSet<Long> minOutDegreeVertex = 
graph.outDegrees().minBy(1).map(new ProjectVertexId());
+               
+               /** print the results **/
+               ExampleUtils.printResult(env.fromElements(numVertices), "Total 
number of vertices");
+               ExampleUtils.printResult(env.fromElements(numEdges), "Total 
number of edges");
+               ExampleUtils.printResult(avgNodeDegree, "Average node degree");
+               ExampleUtils.printResult(maxInDegreeVertex, "Vertex with Max 
in-degree");
+               ExampleUtils.printResult(minInDegreeVertex, "Vertex with Min 
in-degree");
+               ExampleUtils.printResult(maxOutDegreeVertex, "Vertex with Max 
out-degree");
+               ExampleUtils.printResult(minOutDegreeVertex, "Vertex with Min 
out-degree");
+
+               env.execute();
+       }
+
+       @SuppressWarnings("serial")
+       private static final class AvgNodeDegreeMapper implements 
MapFunction<Tuple2<Long, Long>, Double> {
+
+               private long numberOfVertices;
+
+               public AvgNodeDegreeMapper(long numberOfVertices) {
+                       this.numberOfVertices = numberOfVertices;
+               }
+
+               public Double map(Tuple2<Long, Long> sumTuple) {
+                       return (double) (sumTuple.f1 / numberOfVertices) ;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class ProjectVertexId implements 
MapFunction<Tuple2<Long,Long>, Long> {
+               public Long map(Tuple2<Long, Long> value) { return value.f0; }
+       }
+
+       @Override
+       public String getDescription() {
+               return "Graph Metrics Example";
+       }
+
+       // 
******************************************************************************************************************
+       // UTIL METHODS
+       // 
******************************************************************************************************************
+
+       private static boolean fileOutput = false;
+
+       private static String edgesInputPath = null;
+
+       static final int NUM_VERTICES = 100;
+
+       static final long SEED = 9876;
+
+       private static boolean parseParameters(String[] args) {
+
+               if(args.length > 0) {
+                       if(args.length != 1) {
+                               System.err.println("Usage: GraphMetrics <input 
edges>");
+                               return false;
+                       }
+
+                       fileOutput = true;
+                       edgesInputPath = args[0];
+               } else {
+                       System.out.println("Executing Graph Metrics example 
with default parameters and built-in default data.");
+                       System.out.println("  Provide parameters to read input 
data from files.");
+                       System.out.println("  See the documentation for the 
correct format of input files.");
+                       System.out.println("Usage: GraphMetrics <input edges>");
+               }
+               return true;
+       }
+
+       @SuppressWarnings("serial")
+       private static DataSet<Edge<Long, NullValue>> 
getEdgesDataSet(ExecutionEnvironment env) {
+               if (fileOutput) {
+                       return env.readCsvFile(edgesInputPath)
+                                       
.lineDelimiter("\n").fieldDelimiter("\t")
+                                       .types(Long.class, Long.class).map(
+                                                       new 
MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+
+                                                               public 
Edge<Long, NullValue> map(Tuple2<Long, Long> value) {
+                                                                       return 
new Edge<Long, NullValue>(value.f0, value.f1, 
+                                                                               
        NullValue.getInstance());
+                                                               }
+                                       });
+               } else {
+                       return ExampleUtils.getRandomEdges(env, NUM_VERTICES);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/IncrementalSSSP.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/IncrementalSSSP.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/IncrementalSSSP.java
new file mode 100644
index 0000000..26e419f
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/IncrementalSSSP.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.examples.data.IncrementalSSSPData;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+/**
+ * This example illustrates how to 
+ * <ul>
+ *  <li> create a Graph directly from CSV files
+ *  <li> use the scatter-gather iteration's messaging direction configuration 
option
+ * </ul>
+ * 
+ * Incremental Single Sink Shortest Paths Example. Shortest Paths are 
incrementally updated
+ * upon edge removal.
+ *
+ * The program takes as input the resulted graph after a SSSP computation,
+ * an edge to be removed and the initial graph(i.e. before SSSP was computed).
+ * In the following description, SP-graph is used as an abbreviation for
+ * the graph resulted from the SSSP computation. We denote the edges that 
belong to this
+ * graph by SP-edges.
+ *
+ * - If the removed edge does not belong to the SP-graph, no computation is 
necessary.
+ * The edge is simply removed from the graph.
+ * - If the removed edge is an SP-edge, then all nodes, whose shortest path 
contains the removed edge,
+ * potentially require re-computation.
+ * When the edge {@code <u, v>} is removed, v checks if it has another 
out-going SP-edge.
+ * If yes, no further computation is required.
+ * If v has no other out-going SP-edge, it invalidates its current value, by 
setting it to INF.
+ * Then, it informs all its SP-in-neighbors by sending them an INVALIDATE 
message.
+ * When a vertex u receives an INVALIDATE message from v, it checks whether it 
has another out-going SP-edge.
+ * If not, it invalidates its current value and propagates the INVALIDATE 
message.
+ * The propagation stops when a vertex with an alternative shortest path is 
reached
+ * or when we reach a vertex with no SP-in-neighbors.
+ *
+ * Usage <code>IncrementalSSSP &lt;vertex path&gt; &lt;edge path&gt; &lt;edges 
in SSSP&gt;
+ * &lt;src id edge to be removed&gt; &lt;trg id edge to be removed&gt; &lt;val 
edge to be removed&gt;
+ * &lt;result path&gt; &lt;number of iterations&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link IncrementalSSSPData}
+ */
+@SuppressWarnings("serial")
+public class IncrementalSSSP implements ProgramDescription {
+
+       public static void main(String [] args) throws Exception {
+
+               if(!parseParameters(args)) {
+                       return;
+               }
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Edge<Long, Double> edgeToBeRemoved = getEdgeToBeRemoved();
+
+               Graph<Long, Double, Double> graph = 
IncrementalSSSP.getGraph(env);
+
+               // Assumption: all minimum weight paths are kept
+               Graph<Long, Double, Double> ssspGraph = 
IncrementalSSSP.getSSSPGraph(env);
+
+               // remove the edge
+               graph.removeEdge(edgeToBeRemoved);
+
+               // configure the iteration
+               ScatterGatherConfiguration parameters = new 
ScatterGatherConfiguration();
+
+               if(isInSSSP(edgeToBeRemoved, ssspGraph.getEdges())) {
+
+                       parameters.setDirection(EdgeDirection.IN);
+                       parameters.setOptDegrees(true);
+
+                       // run the scatter-gather iteration to propagate info
+                       Graph<Long, Double, Double> result = 
ssspGraph.runScatterGatherIteration(new VertexDistanceUpdater(),
+                                       new 
InvalidateMessenger(edgeToBeRemoved), maxIterations, parameters);
+
+                       DataSet<Vertex<Long, Double>> resultedVertices = 
result.getVertices();
+
+                       // Emit results
+                       if(fileOutput) {
+                               resultedVertices.writeAsCsv(outputPath, "\n", 
",");
+                               env.execute("Incremental SSSP Example");
+                       } else {
+                               resultedVertices.print();
+                       }
+               } else {
+                       // print the vertices
+                       if(fileOutput) {
+                               graph.getVertices().writeAsCsv(outputPath, 
"\n", ",");
+                               env.execute("Incremental SSSP Example");
+                       } else {
+                               graph.getVertices().print();
+                       }
+               }
+       }
+
+       @Override
+       public String getDescription() {
+               return "Incremental Single Sink Shortest Paths Example";
+       }
+
+       // 
******************************************************************************************************************
+       // IncrementalSSSP METHODS
+       // 
******************************************************************************************************************
+
+       /**
+        * Function that verifies whether the edge to be removed is part of the 
SSSP or not.
+        * If it is, the src vertex will be invalidated.
+        *
+        * @param edgeToBeRemoved
+        * @param edgesInSSSP
+        * @return true or false
+        */
+       public static boolean isInSSSP(final Edge<Long, Double> 
edgeToBeRemoved, DataSet<Edge<Long, Double>> edgesInSSSP) throws Exception {
+
+               return edgesInSSSP.filter(new FilterFunction<Edge<Long, 
Double>>() {
+                       @Override
+                       public boolean filter(Edge<Long, Double> edge) throws 
Exception {
+                               return edge.equals(edgeToBeRemoved);
+                       }
+               }).count() > 0;
+       }
+
+       public static final class VertexDistanceUpdater extends 
VertexUpdateFunction<Long, Double, Double> {
+
+               @Override
+               public void updateVertex(Vertex<Long, Double> vertex, 
MessageIterator<Double> inMessages) throws Exception {
+                       if (inMessages.hasNext()) {
+                               Long outDegree = getOutDegree() - 1;
+                               // check if the vertex has another SP-Edge
+                               if (outDegree <= 0) {
+                                       // set own value to infinity
+                                       setNewVertexValue(Double.MAX_VALUE);
+                               }
+                       }
+               }
+       }
+
+       public static final class InvalidateMessenger extends 
MessagingFunction<Long, Double, Double, Double> {
+
+               private Edge<Long, Double> edgeToBeRemoved;
+
+               public InvalidateMessenger(Edge<Long, Double> edgeToBeRemoved) {
+                       this.edgeToBeRemoved = edgeToBeRemoved;
+               }
+
+               @Override
+               public void sendMessages(Vertex<Long, Double> vertex) throws 
Exception {
+
+
+                       if(getSuperstepNumber() == 1) {
+                               
if(vertex.getId().equals(edgeToBeRemoved.getSource())) {
+                                       // activate the edge target
+                                       
sendMessageTo(edgeToBeRemoved.getSource(), Double.MAX_VALUE);
+                               }
+                       }
+
+                       if(getSuperstepNumber() > 1) {
+                               // invalidate all edges
+                               for(Edge<Long, Double> edge : getEdges()) {
+                                       sendMessageTo(edge.getSource(), 
Double.MAX_VALUE);
+                               }
+                       }
+               }
+       }
+
+       // 
******************************************************************************************************************
+       // UTIL METHODS
+       // 
******************************************************************************************************************
+
+       private static boolean fileOutput = false;
+
+       private static String verticesInputPath = null;
+
+       private static String edgesInputPath = null;
+
+       private static String edgesInSSSPInputPath = null;
+
+       private static Long srcEdgeToBeRemoved = null;
+
+       private static Long trgEdgeToBeRemoved = null;
+
+       private static Double valEdgeToBeRemoved = null;
+
+       private static String outputPath = null;
+
+       private static int maxIterations = 5;
+
+       private static boolean parseParameters(String[] args) {
+               if (args.length > 0) {
+                       if (args.length == 8) {
+                               fileOutput = true;
+                               verticesInputPath = args[0];
+                               edgesInputPath = args[1];
+                               edgesInSSSPInputPath = args[2];
+                               srcEdgeToBeRemoved = Long.parseLong(args[3]);
+                               trgEdgeToBeRemoved = Long.parseLong(args[4]);
+                               valEdgeToBeRemoved = 
Double.parseDouble(args[5]);
+                               outputPath = args[6];
+                               maxIterations = Integer.parseInt(args[7]);
+                       } else {
+                               System.out.println("Executing IncrementalSSSP 
example with default parameters and built-in default data.");
+                               System.out.println("Provide parameters to read 
input data from files.");
+                               System.out.println("See the documentation for 
the correct format of input files.");
+                               System.out.println("Usage: IncrementalSSSP 
<vertex path> <edge path> <edges in SSSP> " +
+                                               "<src id edge to be removed> 
<trg id edge to be removed> <val edge to be removed> " +
+                                               "<output path> <max 
iterations>");
+
+                               return false;
+                       }
+               }
+               return true;
+       }
+
+       private static Graph<Long, Double, Double> 
getGraph(ExecutionEnvironment env) {
+               if(fileOutput) {
+                       return Graph.fromCsvReader(verticesInputPath, 
edgesInputPath, env).lineDelimiterEdges("\n")
+                                       .types(Long.class, Double.class, 
Double.class);
+               } else {
+                       return 
Graph.fromDataSet(IncrementalSSSPData.getDefaultVertexDataSet(env), 
IncrementalSSSPData.getDefaultEdgeDataSet(env), env);
+               }
+       }
+
+       private static Graph<Long, Double, Double> 
getSSSPGraph(ExecutionEnvironment env) {
+               if(fileOutput) {
+                       return Graph.fromCsvReader(verticesInputPath, 
edgesInSSSPInputPath, env).lineDelimiterEdges("\n")
+                                       .types(Long.class, Double.class, 
Double.class);
+               } else {
+                       return 
Graph.fromDataSet(IncrementalSSSPData.getDefaultVertexDataSet(env), 
IncrementalSSSPData.getDefaultEdgesInSSSP(env), env);
+               }
+       }
+
+       private static Edge<Long, Double> getEdgeToBeRemoved() {
+               if (fileOutput) {
+                       return new Edge<Long, Double>(srcEdgeToBeRemoved, 
trgEdgeToBeRemoved, valEdgeToBeRemoved);
+               } else {
+                       return IncrementalSSSPData.getDefaultEdgeToBeRemoved();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardSimilarityMeasure.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardSimilarityMeasure.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardSimilarityMeasure.java
new file mode 100644
index 0000000..fbd735b
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardSimilarityMeasure.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.ReduceNeighborsFunction;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.Triplet;
+import org.apache.flink.graph.VertexJoinFunction;
+import org.apache.flink.graph.examples.data.JaccardSimilarityMeasureData;
+
+import java.util.HashSet;
+
+/**
+ * This example shows how to use
+ * <ul>
+ *  <li> neighborhood methods
+ *  <li> join with vertices
+ *  <li> triplets
+ * </ul>
+ * 
+ * Given a directed, unweighted graph, return a weighted graph where the edge 
values are equal
+ * to the Jaccard similarity coefficient - the number of common neighbors 
divided by the the size
+ * of the union of neighbor sets - for the src and target vertices.
+ *
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <br>
+ *     Edges are represented by pairs of srcVertexId, trgVertexId separated by 
tabs.
+ *     Edges themselves are separated by newlines.
+ *     For example: <code>1    2\n1    3\n</code> defines two edges 1-2 and 
1-3.
+ * </p>
+ *
+ * Usage <code> JaccardSimilarityMeasure &lt;edge path&gt; &lt;result 
path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link JaccardSimilarityMeasureData}
+ */
+@SuppressWarnings("serial")
+public class JaccardSimilarityMeasure implements ProgramDescription {
+
+       public static void main(String [] args) throws Exception {
+
+               if(!parseParameters(args)) {
+                       return;
+               }
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+               Graph<Long, HashSet<Long>, Double> graph = 
Graph.fromDataSet(edges,
+                               new MapFunction<Long, HashSet<Long>>() {
+
+                                       @Override
+                                       public HashSet<Long> map(Long id) 
throws Exception {
+                                               HashSet<Long> neighbors = new 
HashSet<Long>();
+                                               neighbors.add(id);
+
+                                               return new 
HashSet<Long>(neighbors);
+                                       }
+                               }, env);
+
+               // create the set of neighbors
+               DataSet<Tuple2<Long, HashSet<Long>>> computedNeighbors =
+                               graph.reduceOnNeighbors(new GatherNeighbors(), 
EdgeDirection.ALL);
+
+               // join with the vertices to update the node values
+               Graph<Long, HashSet<Long>, Double> graphWithVertexValues =
+                               graph.joinWithVertices(computedNeighbors, new 
VertexJoinFunction<HashSet<Long>,
+                                               HashSet<Long>>() {
+
+                                       public HashSet<Long> 
vertexJoin(HashSet<Long> vertexValue, HashSet<Long> inputValue) {
+                                               return inputValue;
+                                       }
+                               });
+
+               // compare neighbors, compute Jaccard
+               DataSet<Edge<Long, Double>> edgesWithJaccardValues =
+                               graphWithVertexValues.getTriplets().map(new 
ComputeJaccard());
+
+               // emit result
+               if (fileOutput) {
+                       edgesWithJaccardValues.writeAsCsv(outputPath, "\n", 
",");
+
+                       // since file sinks are lazy, we trigger the execution 
explicitly
+                       env.execute("Executing Jaccard Similarity Measure");
+               } else {
+                       edgesWithJaccardValues.print();
+               }
+
+       }
+
+       @Override
+       public String getDescription() {
+               return "Vertex Jaccard Similarity Measure";
+       }
+
+       /**
+        * Each vertex will have a HashSet containing its neighbor ids as value.
+        */
+       private static final class GatherNeighbors implements 
ReduceNeighborsFunction<HashSet<Long>> {
+
+               @Override
+               public HashSet<Long> reduceNeighbors(HashSet<Long> first, 
HashSet<Long> second) {
+                       first.addAll(second);
+                       return new HashSet<Long>(first);
+               }
+       }
+
+       /**
+        * The edge weight will be the Jaccard coefficient, which is computed 
as follows:
+        *
+        * Consider the edge x-y
+        * We denote by sizeX and sizeY, the neighbors hash set size of x and y 
respectively.
+        * sizeX+sizeY = union + intersection of neighborhoods
+        * size(hashSetX.addAll(hashSetY)).distinct = union of neighborhoods
+        * The intersection can then be deduced.
+        *
+        * The Jaccard similarity coefficient is then, the intersection/union.
+        */
+       private static final class ComputeJaccard implements
+                       MapFunction<Triplet<Long, HashSet<Long>, Double>, 
Edge<Long, Double>> {
+
+               @Override
+               public Edge<Long, Double> map(Triplet<Long, HashSet<Long>, 
Double> triplet) throws Exception {
+
+                       Vertex<Long, HashSet<Long>> srcVertex = 
triplet.getSrcVertex();
+                       Vertex<Long, HashSet<Long>> trgVertex = 
triplet.getTrgVertex();
+
+                       Long x = srcVertex.getId();
+                       Long y = trgVertex.getId();
+                       HashSet<Long> neighborSetY = trgVertex.getValue();
+
+                       double unionPlusIntersection = 
srcVertex.getValue().size() + neighborSetY.size();
+                       // within a HashSet, all elements are distinct
+                       HashSet<Long> unionSet = new HashSet<Long>();
+                       unionSet.addAll(srcVertex.getValue());
+                       unionSet.addAll(neighborSetY);
+                       double union = unionSet.size();
+                       double intersection = unionPlusIntersection - union;
+
+                       return new Edge<Long, Double>(x, y, intersection/union);
+               }
+       }
+
+       // 
*************************************************************************
+       // UTIL METHODS
+       // 
*************************************************************************
+
+       private static boolean fileOutput = false;
+       private static String edgeInputPath = null;
+       private static String outputPath = null;
+
+       private static boolean parseParameters(String [] args) {
+               if(args.length > 0) {
+                       if(args.length != 2) {
+                               System.err.println("Usage 
JaccardSimilarityMeasure <edge path> <output path>");
+                               return false;
+                       }
+
+                       fileOutput = true;
+                       edgeInputPath = args[0];
+                       outputPath = args[1];
+               } else {
+                       System.out.println("Executing JaccardSimilarityMeasure 
example with default parameters and built-in default data.");
+                       System.out.println("Provide parameters to read input 
data from files.");
+                       System.out.println("Usage JaccardSimilarityMeasure 
<edge path> <output path>");
+               }
+
+               return true;
+       }
+
+       private static DataSet<Edge<Long, Double>> 
getEdgesDataSet(ExecutionEnvironment env) {
+
+               if(fileOutput) {
+                       return env.readCsvFile(edgeInputPath)
+                                       .ignoreComments("#")
+                                       .fieldDelimiter("\t")
+                                       .lineDelimiter("\n")
+                                       .types(Long.class, Long.class)
+                                       .map(new MapFunction<Tuple2<Long, 
Long>, Edge<Long, Double>>() {
+                                               @Override
+                                               public Edge<Long, Double> 
map(Tuple2<Long, Long> tuple2) throws Exception {
+                                                       return new Edge<Long, 
Double>(tuple2.f0, tuple2.f1, new Double(0));
+                                               }
+                                       });
+               } else {
+                       return 
JaccardSimilarityMeasureData.getDefaultEdgeDataSet(env);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/MusicProfiles.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/MusicProfiles.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/MusicProfiles.java
new file mode 100644
index 0000000..b7b590d
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/MusicProfiles.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.EdgesFunctionWithVertexValue;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.VertexJoinFunction;
+import org.apache.flink.graph.examples.data.MusicProfilesData;
+import org.apache.flink.graph.library.LabelPropagation;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+/**
+ * This example demonstrates how to mix the DataSet Flink API with the Gelly 
API.
+ * The input is a set &lt;userId - songId - playCount&gt; triplets and
+ * a set of bad records, i.e. song ids that should not be trusted.
+ * Initially, we use the DataSet API to filter out the bad records.
+ * Then, we use Gelly to create a user -&gt; song weighted bipartite graph and 
compute
+ * the top song (most listened) per user.
+ * Then, we use the DataSet API again, to create a user-user similarity graph,
+ * based on common songs, where users that are listeners of the same song
+ * are connected. A user-defined threshold on the playcount value
+ * defines when a user is considered to be a listener of a song.
+ * Finally, we use the graph API to run the label propagation community 
detection algorithm on
+ * the similarity graph.
+ *
+ * The triplets input is expected to be given as one triplet per line,
+ * in the following format: 
"&lt;userID&gt;\t&lt;songID&gt;\t&lt;playcount&gt;".
+ *
+ * The mismatches input file is expected to contain one mismatch record per 
line,
+ * in the following format:
+ * "ERROR: &lt;songID trackID&gt; song_title"
+ *
+ * If no arguments are provided, the example runs with default data from 
{@link MusicProfilesData}.
+ */
+@SuppressWarnings("serial")
+public class MusicProfiles implements ProgramDescription {
+
+       public static void main(String[] args) throws Exception {
+
+               if (!parseParameters(args)) {
+                       return;
+               }
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               /**
+                * Read the user-song-play triplets.
+                */
+               DataSet<Tuple3<String, String, Integer>> triplets = 
getUserSongTripletsData(env);
+
+               /**
+                * Read the mismatches dataset and extract the songIDs
+                */
+               DataSet<Tuple1<String>> mismatches = 
getMismatchesData(env).map(new ExtractMismatchSongIds());
+
+               /**
+                * Filter out the mismatches from the triplets dataset
+                */
+               DataSet<Tuple3<String, String, Integer>> validTriplets = 
triplets
+                               .coGroup(mismatches).where(1).equalTo(0)
+                               .with(new FilterOutMismatches());
+
+               /**
+                * Create a user -> song weighted bipartite graph where the 
edge weights
+                * correspond to play counts
+                */
+               Graph<String, NullValue, Integer> userSongGraph = 
Graph.fromTupleDataSet(validTriplets, env);
+
+               /**
+                * Get the top track (most listened) for each user
+                */
+               DataSet<Tuple2<String, String>> usersWithTopTrack = 
userSongGraph
+                               .groupReduceOnEdges(new GetTopSongPerUser(), 
EdgeDirection.OUT)
+                               .filter(new FilterSongNodes());
+
+               if (fileOutput) {
+                       usersWithTopTrack.writeAsCsv(topTracksOutputPath, "\n", 
"\t");
+               } else {
+                       usersWithTopTrack.print();
+               }
+
+               /**
+                * Create a user-user similarity graph, based on common songs, 
i.e. two
+                * users that listen to the same song are connected. For each 
song, we
+                * create an edge between each pair of its in-neighbors.
+                */
+               DataSet<Edge<String, NullValue>> similarUsers = userSongGraph
+                               .getEdges()
+                               // filter out user-song edges that are below 
the playcount threshold
+                               .filter(new FilterFunction<Edge<String, 
Integer>>() {
+                                       public boolean filter(Edge<String, 
Integer> edge) {
+                                               return (edge.getValue() > 
playcountThreshold);
+                                       }
+                               }).groupBy(1)
+                               .reduceGroup(new 
CreateSimilarUserEdges()).distinct();
+
+               Graph<String, Long, NullValue> similarUsersGraph = 
Graph.fromDataSet(similarUsers,
+                               new MapFunction<String, Long>() {
+                                       public Long map(String value) {
+                                               return 1l;
+                                       }
+                               }, env).getUndirected();
+
+               /**
+                * Detect user communities using the label propagation library 
method
+                */
+               // Initialize each vertex with a unique numeric label and run 
the label propagation algorithm
+               DataSet<Tuple2<String, Long>> idsWithInitialLabels = 
DataSetUtils
+                               
.zipWithUniqueId(similarUsersGraph.getVertexIds())
+                               .map(new MapFunction<Tuple2<Long, String>, 
Tuple2<String, Long>>() {
+                                       @Override
+                                       public Tuple2<String, Long> 
map(Tuple2<Long, String> tuple2) throws Exception {
+                                               return new Tuple2<String, 
Long>(tuple2.f1, tuple2.f0);
+                                       }
+                               });
+
+               DataSet<Vertex<String, Long>> verticesWithCommunity = 
similarUsersGraph
+                               .joinWithVertices(idsWithInitialLabels,
+                                               new VertexJoinFunction<Long, 
Long>() {
+                                                       public Long 
vertexJoin(Long vertexValue, Long inputValue) {
+                                                               return 
inputValue;
+                                                       }
+                                               }).run(new 
LabelPropagation<String, Long, NullValue>(maxIterations));
+
+               if (fileOutput) {
+                       verticesWithCommunity.writeAsCsv(communitiesOutputPath, 
"\n", "\t");
+
+                       // since file sinks are lazy, we trigger the execution 
explicitly
+                       env.execute();
+               } else {
+                       verticesWithCommunity.print();
+               }
+
+       }
+
+       public static final class ExtractMismatchSongIds implements 
MapFunction<String, Tuple1<String>> {
+
+               public Tuple1<String> map(String value) {
+                       String[] tokens = value.split("\\s+");
+                       String songId = tokens[1].substring(1);
+                       return new Tuple1<String>(songId);
+               }
+       }
+
+       public static final class FilterOutMismatches implements 
CoGroupFunction<Tuple3<String, String, Integer>,
+               Tuple1<String>, Tuple3<String, String, Integer>> {
+
+               public void coGroup(Iterable<Tuple3<String, String, Integer>> 
triplets,
+                               Iterable<Tuple1<String>> invalidSongs, 
Collector<Tuple3<String, String, Integer>> out) {
+
+                       if (!invalidSongs.iterator().hasNext()) {
+                               // this is a valid triplet
+                               for (Tuple3<String, String, Integer> triplet : 
triplets) {
+                                       out.collect(triplet);
+                               }
+                       }
+               }
+       }
+
+       public static final class FilterSongNodes implements 
FilterFunction<Tuple2<String, String>> {
+               public boolean filter(Tuple2<String, String> value) throws 
Exception {
+                       return !value.f1.equals("");
+               }
+       }
+
+       public static final class GetTopSongPerUser     implements 
EdgesFunctionWithVertexValue<String, NullValue, Integer,
+               Tuple2<String, String>> {
+
+               public void iterateEdges(Vertex<String, NullValue> vertex,
+                               Iterable<Edge<String, Integer>> edges, 
Collector<Tuple2<String, String>> out) throws Exception {
+
+                       int maxPlaycount = 0;
+                       String topSong = "";
+                       for (Edge<String, Integer> edge : edges) {
+                               if (edge.getValue() > maxPlaycount) {
+                                       maxPlaycount = edge.getValue();
+                                       topSong = edge.getTarget();
+                               }
+                       }
+                       out.collect(new Tuple2<String, String>(vertex.getId(), 
topSong));
+               }
+       }
+
+       public static final class CreateSimilarUserEdges implements 
GroupReduceFunction<Edge<String, Integer>,
+               Edge<String, NullValue>> {
+
+               public void reduce(Iterable<Edge<String, Integer>> edges, 
Collector<Edge<String, NullValue>> out) {
+                       List<String> listeners = new ArrayList<String>();
+                       for (Edge<String, Integer> edge : edges) {
+                               listeners.add(edge.getSource());
+                       }
+                       for (int i = 0; i < listeners.size() - 1; i++) {
+                               for (int j = i + 1; j < listeners.size(); j++) {
+                                       out.collect(new Edge<String, 
NullValue>(listeners.get(i),
+                                                       listeners.get(j), 
NullValue.getInstance()));
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public String getDescription() {
+               return "Music Profiles Example";
+       }
+
+       // 
******************************************************************************************************************
+       // UTIL METHODS
+       // 
******************************************************************************************************************
+
+       private static boolean fileOutput = false;
+
+       private static String userSongTripletsInputPath = null;
+
+       private static String mismatchesInputPath = null;
+
+       private static String topTracksOutputPath = null;
+
+       private static int playcountThreshold = 0;
+
+       private static String communitiesOutputPath = null;
+
+       private static int maxIterations = 10;
+
+       private static boolean parseParameters(String[] args) {
+
+               if(args.length > 0) {
+                       if(args.length != 6) {
+                               System.err.println("Usage: MusicProfiles <input 
user song triplets path>" +
+                                               " <input song mismatches path> 
<output top tracks path> "
+                                               + "<playcount threshold> 
<output communities path> <num iterations>");
+                               return false;
+                       }
+
+                       fileOutput = true;
+                       userSongTripletsInputPath = args[0];
+                       mismatchesInputPath = args[1];
+                       topTracksOutputPath = args[2];
+                       playcountThreshold = Integer.parseInt(args[3]);
+                       communitiesOutputPath = args[4];
+                       maxIterations = Integer.parseInt(args[5]);
+               } else {
+                       System.out.println("Executing Music Profiles example 
with default parameters and built-in default data.");
+                       System.out.println("  Provide parameters to read input 
data from files.");
+                       System.out.println("  See the documentation for the 
correct format of input files.");
+                       System.out.println("Usage: MusicProfiles <input user 
song triplets path>" +
+                                       " <input song mismatches path> <output 
top tracks path> "
+                                       + "<playcount threshold> <output 
communities path> <num iterations>");
+               }
+               return true;
+       }
+
+       private static DataSet<Tuple3<String, String, Integer>> 
getUserSongTripletsData(ExecutionEnvironment env) {
+               if (fileOutput) {
+                       return env.readCsvFile(userSongTripletsInputPath)
+                                       
.lineDelimiter("\n").fieldDelimiter("\t")
+                                       .types(String.class, String.class, 
Integer.class);
+               } else {
+                       return MusicProfilesData.getUserSongTriplets(env);
+               }
+       }
+
+       private static DataSet<String> getMismatchesData(ExecutionEnvironment 
env) {
+               if (fileOutput) {
+                       return env.readTextFile(mismatchesInputPath);
+               } else {
+                       return MusicProfilesData.getMismatches(env);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java
new file mode 100644
index 0000000..c9abf02
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData;
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+
+/**
+ * This example shows how to use Gelly's scatter-gather iterations.
+ * 
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm.
+ * For a gather-sum-apply implementation of the same algorithm, please refer 
to {@link GSASingleSourceShortestPaths}. 
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which 
are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
+ *
+ * If no parameters are provided, the program is run with default data from
+ * {@link SingleSourceShortestPathsData}
+ */
+public class SingleSourceShortestPaths implements ProgramDescription {
+
+       public static void main(String[] args) throws Exception {
+
+               if (!parseParameters(args)) {
+                       return;
+               }
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+               Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, 
new InitVertices(srcVertexId), env);
+
+               // Execute the scatter-gather iteration
+               Graph<Long, Double, Double> result = 
graph.runScatterGatherIteration(
+                               new VertexDistanceUpdater(), new 
MinDistanceMessenger(), maxIterations);
+
+               // Extract the vertices as the result
+               DataSet<Vertex<Long, Double>> singleSourceShortestPaths = 
result.getVertices();
+
+               // emit result
+               if (fileOutput) {
+                       singleSourceShortestPaths.writeAsCsv(outputPath, "\n", 
",");
+
+                       // since file sinks are lazy, we trigger the execution 
explicitly
+                       env.execute("Single Source Shortest Paths Example");
+               } else {
+                       singleSourceShortestPaths.print();
+               }
+
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Single Source Shortest Path UDFs
+       // 
--------------------------------------------------------------------------------------------
+
+       @SuppressWarnings("serial")
+       private static final class InitVertices implements MapFunction<Long, 
Double>{
+
+               private long srcId;
+
+               public InitVertices(long srcId) {
+                       this.srcId = srcId;
+               }
+
+               public Double map(Long id) {
+                       if (id.equals(srcId)) {
+                               return 0.0;
+                       }
+                       else {
+                               return Double.POSITIVE_INFINITY;
+                       }
+               }
+       }
+
+       /**
+        * Function that updates the value of a vertex by picking the minimum
+        * distance from all incoming messages.
+        */
+       @SuppressWarnings("serial")
+       public static final class VertexDistanceUpdater extends 
VertexUpdateFunction<Long, Double, Double> {
+
+               @Override
+               public void updateVertex(Vertex<Long, Double> vertex, 
MessageIterator<Double> inMessages) {
+
+                       Double minDistance = Double.MAX_VALUE;
+
+                       for (double msg : inMessages) {
+                               if (msg < minDistance) {
+                                       minDistance = msg;
+                               }
+                       }
+
+                       if (vertex.getValue() > minDistance) {
+                               setNewVertexValue(minDistance);
+                       }
+               }
+       }
+
+       /**
+        * Distributes the minimum distance associated with a given vertex 
among all
+        * the target vertices summed up with the edge's value.
+        */
+       @SuppressWarnings("serial")
+       public static final class MinDistanceMessenger extends 
MessagingFunction<Long, Double, Double, Double> {
+
+               @Override
+               public void sendMessages(Vertex<Long, Double> vertex) {
+                       if (vertex.getValue() < Double.POSITIVE_INFINITY) {
+                               for (Edge<Long, Double> edge : getEdges()) {
+                                       sendMessageTo(edge.getTarget(), 
vertex.getValue() + edge.getValue());
+                               }
+                       }
+               }
+       }
+
+       // 
******************************************************************************************************************
+       // UTIL METHODS
+       // 
******************************************************************************************************************
+
+       private static boolean fileOutput = false;
+
+       private static Long srcVertexId = 1l;
+
+       private static String edgesInputPath = null;
+
+       private static String outputPath = null;
+
+       private static int maxIterations = 5;
+
+       private static boolean parseParameters(String[] args) {
+
+               if(args.length > 0) {
+                       if(args.length != 4) {
+                               System.err.println("Usage: 
SingleSourceShortestPaths <source vertex id>" +
+                                               " <input edges path> <output 
path> <num iterations>");
+                               return false;
+                       }
+
+                       fileOutput = true;
+                       srcVertexId = Long.parseLong(args[0]);
+                       edgesInputPath = args[1];
+                       outputPath = args[2];
+                       maxIterations = Integer.parseInt(args[3]);
+               } else {
+                               System.out.println("Executing Single Source 
Shortest Paths example "
+                                               + "with default parameters and 
built-in default data.");
+                               System.out.println("  Provide parameters to 
read input data from files.");
+                               System.out.println("  See the documentation for 
the correct format of input files.");
+                               System.out.println("Usage: 
SingleSourceShortestPaths <source vertex id>" +
+                                               " <input edges path> <output 
path> <num iterations>");
+               }
+               return true;
+       }
+
+       private static DataSet<Edge<Long, Double>> 
getEdgesDataSet(ExecutionEnvironment env) {
+               if (fileOutput) {
+                       return env.readCsvFile(edgesInputPath)
+                                       .lineDelimiter("\n")
+                                       .fieldDelimiter("\t")
+                                       .types(Long.class, Long.class, 
Double.class)
+                                       .map(new Tuple3ToEdgeMap<Long, 
Double>());
+               } else {
+                       return 
SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
+               }
+       }
+
+       @Override
+       public String getDescription() {
+               return "Scatter-gather Single Source Shortest Paths";
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/CommunityDetectionData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/CommunityDetectionData.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/CommunityDetectionData.java
new file mode 100644
index 0000000..d3ddfd8
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/CommunityDetectionData.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.data;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data set used for the Simple Community Detection test 
program.
+ * If no parameters are given to the program, the default edge data set is 
used.
+ */
+public class CommunityDetectionData {
+
+       // the algorithm is not guaranteed to always converge
+       public static final Integer MAX_ITERATIONS = 30;
+
+       public static final double DELTA = 0.5f;
+
+       public static final String COMMUNITIES_SINGLE_ITERATION = "1,5\n" + 
"2,6\n"
+                       + "3,1\n" + "4,1\n" + "5,1\n" + "6,8\n" + "7,8\n" + 
"8,7"; 
+
+       public static final String COMMUNITIES_WITH_TIE = "1,2\n" + "2,1\n" + 
"3,1\n" + "4,1\n" + "5,1";
+
+       public static DataSet<Edge<Long, Double>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+               List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, 
Double>>();
+               edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
+               edges.add(new Edge<Long, Double>(1L, 3L, 2.0));
+               edges.add(new Edge<Long, Double>(1L, 4L, 3.0));
+               edges.add(new Edge<Long, Double>(2L, 3L, 4.0));
+               edges.add(new Edge<Long, Double>(2L, 4L, 5.0));
+               edges.add(new Edge<Long, Double>(3L, 5L, 6.0));
+               edges.add(new Edge<Long, Double>(5L, 6L, 7.0));
+               edges.add(new Edge<Long, Double>(5L, 7L, 8.0));
+               edges.add(new Edge<Long, Double>(6L, 7L, 9.0));
+               edges.add(new Edge<Long, Double>(7L, 12L, 10.0));
+               edges.add(new Edge<Long, Double>(8L, 9L, 11.0));
+               edges.add(new Edge<Long, Double>(8L, 10L, 12.0));
+               edges.add(new Edge<Long, Double>(8L, 11L, 13.0));
+               edges.add(new Edge<Long, Double>(9L, 10L, 14.0));
+               edges.add(new Edge<Long, Double>(9L, 11L, 15.0));
+               edges.add(new Edge<Long, Double>(10L, 11L, 16.0));
+               edges.add(new Edge<Long, Double>(10L, 12L, 17.0));
+               edges.add(new Edge<Long, Double>(11L, 12L, 18.0));
+
+               return env.fromCollection(edges);
+       }
+
+       public static DataSet<Edge<Long, Double>> 
getSimpleEdgeDataSet(ExecutionEnvironment env) {
+
+               List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, 
Double>>();
+               edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
+               edges.add(new Edge<Long, Double>(1L, 3L, 2.0));
+               edges.add(new Edge<Long, Double>(1L, 4L, 3.0));
+               edges.add(new Edge<Long, Double>(1L, 5L, 4.0));
+               edges.add(new Edge<Long, Double>(2L, 6L, 5.0));
+               edges.add(new Edge<Long, Double>(6L, 7L, 6.0));
+               edges.add(new Edge<Long, Double>(6L, 8L, 7.0));
+               edges.add(new Edge<Long, Double>(7L, 8L, 8.0));
+
+               return env.fromCollection(edges);
+       }
+
+       private CommunityDetectionData() {}
+
+       public static DataSet<Edge<Long, Double>> 
getTieEdgeDataSet(ExecutionEnvironment env) {
+               List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, 
Double>>();
+               edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
+               edges.add(new Edge<Long, Double>(1L, 3L, 1.0));
+               edges.add(new Edge<Long, Double>(1L, 4L, 1.0));
+               edges.add(new Edge<Long, Double>(1L, 5L, 1.0));
+
+               return env.fromCollection(edges);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/ConnectedComponentsDefaultData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/ConnectedComponentsDefaultData.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/ConnectedComponentsDefaultData.java
new file mode 100644
index 0000000..c53f5ba
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/ConnectedComponentsDefaultData.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.data;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.types.NullValue;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the connected components example 
program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class ConnectedComponentsDefaultData {
+
+       public static final Integer MAX_ITERATIONS = 4;
+
+       public static final String EDGES = "1   2\n" + "2       3\n" + "2       
4\n" + "3       4";
+
+       public static final Object[][] DEFAULT_EDGES = new Object[][] {
+               new Object[]{1L, 2L},
+               new Object[]{2L, 3L},
+               new Object[]{2L, 4L},
+               new Object[]{3L, 4L}
+       };
+
+       public static DataSet<Edge<Long, NullValue>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
+               List<Edge<Long, NullValue>> edgeList = new 
LinkedList<Edge<Long, NullValue>>();
+               for (Object[] edge : DEFAULT_EDGES) {
+                       edgeList.add(new Edge<Long, NullValue>((Long) edge[0], 
(Long) edge[1], NullValue.getInstance()));
+               }
+               return env.fromCollection(edgeList);
+       }
+
+       public static final String VERTICES_WITH_MIN_ID = "1,1\n" + "2,1\n" + 
"3,1\n" + "4,1";
+
+       private ConnectedComponentsDefaultData() {}
+}

Reply via email to