[FLINK-3511] [gelly] Introduce flink-gelly-examples module The new flink-gelly-examples module contains all Java and Scala Gelly examples. The module contains compile scope dependencies on flink-java, flink-scala and flink-clients so that the examples can be conveniently run from within the IDE.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c605d27 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c605d27 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c605d27 Branch: refs/heads/release-1.0 Commit: 2c605d275b26793d8676e35b6ccc5102bdcbf30d Parents: c0bc8bc Author: Till Rohrmann <[email protected]> Authored: Fri Feb 26 14:08:02 2016 +0100 Committer: Robert Metzger <[email protected]> Committed: Fri Feb 26 20:57:04 2016 +0100 ---------------------------------------------------------------------- flink-libraries/flink-gelly-examples/pom.xml | 206 +++++++++++++ .../graph/examples/ConnectedComponents.java | 141 +++++++++ .../graph/examples/EuclideanGraphWeighing.java | 214 +++++++++++++ .../examples/GSASingleSourceShortestPaths.java | 191 ++++++++++++ .../flink/graph/examples/GraphMetrics.java | 170 +++++++++++ .../flink/graph/examples/IncrementalSSSP.java | 266 ++++++++++++++++ .../examples/JaccardSimilarityMeasure.java | 214 +++++++++++++ .../flink/graph/examples/MusicProfiles.java | 303 +++++++++++++++++++ .../examples/SingleSourceShortestPaths.java | 201 ++++++++++++ .../examples/data/CommunityDetectionData.java | 95 ++++++ .../data/ConnectedComponentsDefaultData.java | 57 ++++ .../graph/examples/data/EuclideanGraphData.java | 86 ++++++ .../examples/data/IncrementalSSSPData.java | 95 ++++++ .../data/JaccardSimilarityMeasureData.java | 58 ++++ .../examples/data/LabelPropagationData.java | 114 +++++++ .../graph/examples/data/MusicProfilesData.java | 108 +++++++ .../flink/graph/examples/data/PageRankData.java | 69 +++++ .../data/SingleSourceShortestPathsData.java | 62 ++++ .../graph/examples/data/SummarizationData.java | 134 ++++++++ .../graph/examples/data/TriangleCountData.java | 65 ++++ .../graph/examples/utils/ExampleUtils.java | 162 ++++++++++ .../scala/examples/ConnectedComponents.scala | 120 ++++++++ .../examples/GSASingleSourceShortestPaths.scala | 150 +++++++++ .../graph/scala/examples/GraphMetrics.scala | 129 ++++++++ .../examples/SingleSourceShortestPaths.scala | 170 +++++++++++ .../graph/library/CommunityDetectionITCase.java | 81 +++++ .../graph/library/LabelPropagationITCase.java | 79 +++++ .../flink/graph/library/PageRankITCase.java | 128 ++++++++ .../graph/library/SummarizationITCase.java | 188 ++++++++++++ .../graph/library/TriangleCountITCase.java | 53 ++++ .../graph/library/TriangleEnumeratorITCase.java | 57 ++++ .../flink/graph/test/GatherSumApplyITCase.java | 106 +++++++ .../examples/ConnectedComponentsITCase.java | 72 +++++ .../examples/EuclideanGraphWeighingITCase.java | 78 +++++ .../test/examples/IncrementalSSSPITCase.java | 134 ++++++++ .../JaccardSimilarityMeasureITCase.java | 73 +++++ .../test/examples/MusicProfilesITCase.java | 102 +++++++ .../SingleSourceShortestPathsITCase.java | 82 +++++ .../scala/example/ConnectedComponents.scala | 120 -------- .../example/GSASingleSourceShortestPaths.scala | 150 --------- .../graph/scala/example/GraphMetrics.scala | 128 -------- .../example/SingleSourceShortestPaths.scala | 170 ----------- .../graph/example/ConnectedComponents.java | 141 --------- .../graph/example/EuclideanGraphWeighing.java | 214 ------------- .../example/GSASingleSourceShortestPaths.java | 191 ------------ .../flink/graph/example/GraphMetrics.java | 170 ----------- .../flink/graph/example/IncrementalSSSP.java | 266 ---------------- .../graph/example/JaccardSimilarityMeasure.java | 214 ------------- .../flink/graph/example/MusicProfiles.java | 303 ------------------- .../example/SingleSourceShortestPaths.java | 201 ------------ .../example/utils/CommunityDetectionData.java | 95 ------ .../utils/ConnectedComponentsDefaultData.java | 57 ---- .../graph/example/utils/EuclideanGraphData.java | 86 ------ .../flink/graph/example/utils/ExampleUtils.java | 162 ---------- .../example/utils/IncrementalSSSPData.java | 95 ------ .../utils/JaccardSimilarityMeasureData.java | 58 ---- .../example/utils/LabelPropagationData.java | 114 ------- .../graph/example/utils/MusicProfilesData.java | 108 ------- .../flink/graph/example/utils/PageRankData.java | 69 ----- .../utils/SingleSourceShortestPathsData.java | 62 ---- .../graph/example/utils/SummarizationData.java | 134 -------- .../graph/example/utils/TriangleCountData.java | 65 ---- .../graph/library/ConnectedComponents.java | 2 +- .../graph/library/GSAConnectedComponents.java | 2 +- .../library/SingleSourceShortestPaths.java | 2 +- .../flink/graph/library/TriangleEnumerator.java | 4 +- ...ctedComponentsWithRandomisedEdgesITCase.java | 93 ++++++ .../flink/graph/test/GatherSumApplyITCase.java | 106 ------- .../test/example/ConnectedComponentsITCase.java | 71 ----- .../example/EuclideanGraphWeighingITCase.java | 77 ----- .../test/example/IncrementalSSSPITCase.java | 133 -------- .../example/JaccardSimilarityMeasureITCase.java | 72 ----- .../graph/test/example/MusicProfilesITCase.java | 101 ------- .../SingleSourceShortestPathsITCase.java | 81 ----- .../test/library/CommunityDetectionITCase.java | 82 ----- ...ctedComponentsWithRandomisedEdgesITCase.java | 94 ------ .../test/library/LabelPropagationITCase.java | 80 ----- .../graph/test/library/PageRankITCase.java | 130 -------- .../graph/test/library/SummarizationITCase.java | 189 ------------ .../graph/test/library/TriangleCountITCase.java | 54 ---- .../test/library/TriangleEnumeratorITCase.java | 58 ---- flink-libraries/pom.xml | 1 + 82 files changed, 4912 insertions(+), 4706 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/pom.xml b/flink-libraries/flink-gelly-examples/pom.xml new file mode 100644 index 0000000..2b84cc1 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/pom.xml @@ -0,0 +1,206 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-libraries</artifactId> + <version>1.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-gelly_examples_2.10</artifactId> + <name>flink-gelly-examples</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_2.10</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala_2.10</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-gelly_2.10</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-gelly-scala_2.10</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- Scala Compiler --> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.1.4</version> + <executions> + <!-- Run scala compiler in the process-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) compile phase --> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + + <!-- Run scala compiler in the process-test-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) test-compile phase --> + <execution> + <id>scala-test-compile</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <jvmArgs> + <jvmArg>-Xms128m</jvmArg> + <jvmArg>-Xmx512m</jvmArg> + </jvmArgs> + <compilerPlugins combine.children="append"> + <compilerPlugin> + <groupId>org.scalamacros</groupId> + <artifactId>paradise_${scala.version}</artifactId> + <version>${scala.macros.version}</version> + </compilerPlugin> + </compilerPlugins> + </configuration> + </plugin> + + <!-- Eclipse Integration --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-eclipse-plugin</artifactId> + <version>2.8</version> + <configuration> + <downloadSources>true</downloadSources> + <projectnatures> + <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> + <projectnature>org.eclipse.jdt.core.javanature</projectnature> + </projectnatures> + <buildcommands> + <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> + </buildcommands> + <classpathContainers> + <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> + <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> + </classpathContainers> + <excludes> + <exclude>org.scala-lang:scala-library</exclude> + <exclude>org.scala-lang:scala-compiler</exclude> + </excludes> + <sourceIncludes> + <sourceInclude>**/*.scala</sourceInclude> + <sourceInclude>**/*.java</sourceInclude> + </sourceIncludes> + </configuration> + </plugin> + + <!-- Adding scala source directories to build path --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.7</version> + <executions> + <!-- Add src/main/scala to eclipse build path --> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/scala</source> + </sources> + </configuration> + </execution> + <!-- Add src/test/scala to eclipse build path --> + <execution> + <id>add-test-source</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>src/test/scala</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.scalastyle</groupId> + <artifactId>scalastyle-maven-plugin</artifactId> + <version>0.5.0</version> + <executions> + <execution> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + <configuration> + <verbose>false</verbose> + <failOnViolation>true</failOnViolation> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + <failOnWarning>false</failOnWarning> + <sourceDirectory>${basedir}/src/main/scala</sourceDirectory> + <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory> + <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation> + <outputFile>${project.basedir}/scalastyle-output.xml</outputFile> + <outputEncoding>UTF-8</outputEncoding> + </configuration> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java new file mode 100644 index 0000000..93c801f --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples; + +import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData; +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.library.GSAConnectedComponents; +import org.apache.flink.types.NullValue; + +/** + * This example shows how to use Gelly's library methods. + * You can find all available library methods in {@link org.apache.flink.graph.library}. + * + * In particular, this example uses the {@link GSAConnectedComponents} + * library method to compute the connected components of the input graph. + * + * The input file is a plain text file and must be formatted as follows: + * Edges are represented by tuples of srcVertexId, trgVertexId which are + * separated by tabs. Edges themselves are separated by newlines. + * For example: <code>1\t2\n1\t3\n</code> defines two edges, + * 1-2 with and 1-3. + * + * Usage <code>ConnectedComponents <edge path> <result path> + * <number of iterations> </code><br> + * If no parameters are provided, the program is run with default data from + * {@link ConnectedComponentsDefaultData} + */ +public class ConnectedComponents implements ProgramDescription { + + @SuppressWarnings("serial") + public static void main(String [] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env); + + Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new MapFunction<Long, Long>() { + @Override + public Long map(Long value) throws Exception { + return value; + } + }, env); + + DataSet<Vertex<Long, Long>> verticesWithMinIds = graph + .run(new GSAConnectedComponents<Long, NullValue>(maxIterations)); + + // emit result + if (fileOutput) { + verticesWithMinIds.writeAsCsv(outputPath, "\n", ","); + + // since file sinks are lazy, we trigger the execution explicitly + env.execute("Connected Components Example"); + } else { + verticesWithMinIds.print(); + } + } + + @Override + public String getDescription() { + return "Connected Components Example"; + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String edgeInputPath = null; + private static String outputPath = null; + private static Integer maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS; + + private static boolean parseParameters(String [] args) { + if(args.length > 0) { + if(args.length != 3) { + System.err.println("Usage ConnectedComponents <edge path> <output path> " + + "<num iterations>"); + return false; + } + + fileOutput = true; + edgeInputPath = args[0]; + outputPath = args[1]; + maxIterations = Integer.parseInt(args[2]); + + } else { + System.out.println("Executing ConnectedComponents example with default parameters and built-in default data."); + System.out.println("Provide parameters to read input data from files."); + System.out.println("Usage ConnectedComponents <edge path> <output path> " + + "<num iterations>"); + } + + return true; + } + + @SuppressWarnings("serial") + private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) { + + if(fileOutput) { + return env.readCsvFile(edgeInputPath) + .ignoreComments("#") + .fieldDelimiter("\t") + .lineDelimiter("\n") + .types(Long.class, Long.class) + .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() { + @Override + public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception { + return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance()); + } + }); + } else { + return ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/EuclideanGraphWeighing.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/EuclideanGraphWeighing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/EuclideanGraphWeighing.java new file mode 100644 index 0000000..bd6111d --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/EuclideanGraphWeighing.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples; + +import org.apache.flink.graph.examples.data.EuclideanGraphData; +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeJoinFunction; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Triplet; +import org.apache.flink.graph.Vertex; + +import java.io.Serializable; + +/** + * This example shows how to use Gelly's {@link Graph#getTriplets()} and + * {@link Graph#joinWithEdges(DataSet, EdgeJoinFunction)} methods. + * + * Given a directed, unweighted graph, with vertex values representing points in a plan, + * return a weighted graph where the edge weights are equal to the Euclidean distance between the + * src and the trg vertex values. + * + * <p> + * Input files are plain text files and must be formatted as follows: + * <ul> + * <li> Vertices are represented by their vertexIds and vertex values and are separated by newlines, + * the value being formed of two doubles separated by a comma. + * For example: <code>1,1.0,1.0\n2,2.0,2.0\n3,3.0,3.0\n</code> defines a data set of three vertices + * <li> Edges are represented by pairs of srcVertexId, trgVertexId separated by commas. + * Edges themselves are separated by newlines. + * For example: <code>1,2\n1,3\n</code> defines two edges 1-2 and 1-3. + * </ul> + * + * Usage <code>EuclideanGraphWeighing <vertex path> <edge path> <result path></code><br> + * If no parameters are provided, the program is run with default data from + * {@link EuclideanGraphData} + */ +@SuppressWarnings("serial") +public class EuclideanGraphWeighing implements ProgramDescription { + + public static void main(String[] args) throws Exception { + + if (!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Vertex<Long, Point>> vertices = getVerticesDataSet(env); + + DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env); + + Graph<Long, Point, Double> graph = Graph.fromDataSet(vertices, edges, env); + + // the edge value will be the Euclidean distance between its src and trg vertex + DataSet<Tuple3<Long, Long, Double>> edgesWithEuclideanWeight = graph.getTriplets() + .map(new MapFunction<Triplet<Long, Point, Double>, Tuple3<Long, Long, Double>>() { + + @Override + public Tuple3<Long, Long, Double> map(Triplet<Long, Point, Double> triplet) + throws Exception { + + Vertex<Long, Point> srcVertex = triplet.getSrcVertex(); + Vertex<Long, Point> trgVertex = triplet.getTrgVertex(); + + return new Tuple3<Long, Long, Double>(srcVertex.getId(), trgVertex.getId(), + srcVertex.getValue().euclideanDistance(trgVertex.getValue())); + } + }); + + Graph<Long, Point, Double> resultedGraph = graph.joinWithEdges(edgesWithEuclideanWeight, + new EdgeJoinFunction<Double, Double>() { + + public Double edgeJoin(Double edgeValue, Double inputValue) { + return inputValue; + } + }); + + // retrieve the edges from the final result + DataSet<Edge<Long, Double>> result = resultedGraph.getEdges(); + + // emit result + if (fileOutput) { + result.writeAsCsv(outputPath, "\n", ","); + + // since file sinks are lazy, we trigger the execution explicitly + env.execute("Euclidean Graph Weighing Example"); + } else { + result.print(); + } + + } + + @Override + public String getDescription() { + return "Weighing a graph by computing the Euclidean distance " + + "between its vertices"; + } + + // ************************************************************************* + // DATA TYPES + // ************************************************************************* + + /** + * A simple two-dimensional point. + */ + public static class Point implements Serializable { + + public double x, y; + + public Point() {} + + public Point(double x, double y) { + this.x = x; + this.y = y; + } + + public double euclideanDistance(Point other) { + return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y)); + } + + @Override + public String toString() { + return x + " " + y; + } + } + + // ****************************************************************************************************************** + // UTIL METHODS + // ****************************************************************************************************************** + + private static boolean fileOutput = false; + + private static String verticesInputPath = null; + + private static String edgesInputPath = null; + + private static String outputPath = null; + + private static boolean parseParameters(String[] args) { + + if (args.length > 0) { + if (args.length == 3) { + fileOutput = true; + verticesInputPath = args[0]; + edgesInputPath = args[1]; + outputPath = args[2]; + } else { + System.out.println("Executing Euclidean Graph Weighing example with default parameters and built-in default data."); + System.out.println("Provide parameters to read input data from files."); + System.out.println("See the documentation for the correct format of input files."); + System.err.println("Usage: EuclideanGraphWeighing <input vertices path> <input edges path>" + + " <output path>"); + return false; + } + } + return true; + } + + private static DataSet<Vertex<Long, Point>> getVerticesDataSet(ExecutionEnvironment env) { + if (fileOutput) { + return env.readCsvFile(verticesInputPath) + .lineDelimiter("\n") + .types(Long.class, Double.class, Double.class) + .map(new MapFunction<Tuple3<Long, Double, Double>, Vertex<Long, Point>>() { + + @Override + public Vertex<Long, Point> map(Tuple3<Long, Double, Double> value) throws Exception { + return new Vertex<Long, Point>(value.f0, new Point(value.f1, value.f2)); + } + }); + } else { + return EuclideanGraphData.getDefaultVertexDataSet(env); + } + } + + private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) { + if (fileOutput) { + return env.readCsvFile(edgesInputPath) + .lineDelimiter("\n") + .types(Long.class, Long.class) + .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() { + + @Override + public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception { + return new Edge<Long, Double>(tuple2.f0, tuple2.f1, 0.0); + } + }); + } else { + return EuclideanGraphData.getDefaultEdgeDataSet(env); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java new file mode 100755 index 0000000..1732016 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples; + +import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData; +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.gsa.ApplyFunction; +import org.apache.flink.graph.gsa.GatherFunction; +import org.apache.flink.graph.gsa.SumFunction; +import org.apache.flink.graph.gsa.Neighbor; +import org.apache.flink.graph.utils.Tuple3ToEdgeMap; + +/** + * This example shows how to use Gelly's Gather-Sum-Apply iterations. + * + * It is an implementation of the Single-Source-Shortest-Paths algorithm. + * For a vertex-centric implementation of the same algorithm, please refer to {@link SingleSourceShortestPaths}. + * + * The input file is a plain text file and must be formatted as follows: + * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are + * separated by tabs. Edges themselves are separated by newlines. + * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges, + * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4. + * + * If no parameters are provided, the program is run with default data from + * {@link SingleSourceShortestPathsData} + */ +public class GSASingleSourceShortestPaths implements ProgramDescription { + + // -------------------------------------------------------------------------------------------- + // Program + // -------------------------------------------------------------------------------------------- + + public static void main(String[] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env); + + Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env); + + // Execute the GSA iteration + Graph<Long, Double, Double> result = graph.runGatherSumApplyIteration( + new CalculateDistances(), new ChooseMinDistance(), new UpdateDistance(), maxIterations); + + // Extract the vertices as the result + DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices(); + + // emit result + if(fileOutput) { + singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ","); + + // since file sinks are lazy, we trigger the execution explicitly + env.execute("GSA Single Source Shortest Paths"); + } else { + singleSourceShortestPaths.print(); + } + + } + + // -------------------------------------------------------------------------------------------- + // Single Source Shortest Path UDFs + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("serial") + private static final class InitVertices implements MapFunction<Long, Double>{ + + private long srcId; + + public InitVertices(long srcId) { + this.srcId = srcId; + } + + public Double map(Long id) { + if (id.equals(srcId)) { + return 0.0; + } + else { + return Double.POSITIVE_INFINITY; + } + } + } + + @SuppressWarnings("serial") + private static final class CalculateDistances extends GatherFunction<Double, Double, Double> { + + public Double gather(Neighbor<Double, Double> neighbor) { + return neighbor.getNeighborValue() + neighbor.getEdgeValue(); + } + }; + + @SuppressWarnings("serial") + private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> { + + public Double sum(Double newValue, Double currentValue) { + return Math.min(newValue, currentValue); + } + }; + + @SuppressWarnings("serial") + private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> { + + public void apply(Double newDistance, Double oldDistance) { + if (newDistance < oldDistance) { + setResult(newDistance); + } + } + } + + // -------------------------------------------------------------------------------------------- + // Util methods + // -------------------------------------------------------------------------------------------- + + private static boolean fileOutput = false; + + private static Long srcVertexId = 1l; + + private static String edgesInputPath = null; + + private static String outputPath = null; + + private static int maxIterations = 5; + + private static boolean parseParameters(String[] args) { + + if (args.length > 0) { + if(args.length != 4) { + System.err.println("Usage: GSASingleSourceShortestPaths <source vertex id>" + + " <input edges path> <output path> <num iterations>"); + return false; + } + + fileOutput = true; + srcVertexId = Long.parseLong(args[0]); + edgesInputPath = args[1]; + outputPath = args[2]; + maxIterations = Integer.parseInt(args[3]); + } else { + System.out.println("Executing GSASingle Source Shortest Paths example " + + "with default parameters and built-in default data."); + System.out.println(" Provide parameters to read input data from files."); + System.out.println(" See the documentation for the correct format of input files."); + System.out.println("Usage: GSASingleSourceShortestPaths <source vertex id>" + + " <input edges path> <output path> <num iterations>"); + } + return true; + } + + private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) { + if (fileOutput) { + return env.readCsvFile(edgesInputPath) + .fieldDelimiter("\t") + .lineDelimiter("\n") + .types(Long.class, Long.class, Double.class) + .map(new Tuple3ToEdgeMap<Long, Double>()); + } else { + return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env); + } + } + + @Override + public String getDescription() { + return "GSA Single Source Shortest Paths"; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java new file mode 100644 index 0000000..9058538 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples; + +import org.apache.flink.graph.examples.utils.ExampleUtils; +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.aggregation.Aggregations; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.types.NullValue; + +/** + * This example illustrates how to use Gelly metrics methods and get simple statistics + * from the input graph. + * + * The program creates a random graph and computes and prints + * the following metrics: + * - number of vertices + * - number of edges + * - average node degree + * - the vertex ids with the max/min in- and out-degrees + * + * The input file is expected to contain one edge per line, + * with long IDs and no values, in the following format: + * "<sourceVertexID>\t<targetVertexID>". + * If no arguments are provided, the example runs with a random graph of 100 vertices. + * + */ +public class GraphMetrics implements ProgramDescription { + + public static void main(String[] args) throws Exception { + + if (!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + /** create the graph **/ + Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(getEdgesDataSet(env), env); + + /** get the number of vertices **/ + long numVertices = graph.numberOfVertices(); + + /** get the number of edges **/ + long numEdges = graph.numberOfEdges(); + + /** compute the average node degree **/ + DataSet<Tuple2<Long, Long>> verticesWithDegrees = graph.getDegrees(); + + DataSet<Double> avgNodeDegree = verticesWithDegrees + .aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper(numVertices)); + + /** find the vertex with the maximum in-degree **/ + DataSet<Long> maxInDegreeVertex = graph.inDegrees().maxBy(1).map(new ProjectVertexId()); + + /** find the vertex with the minimum in-degree **/ + DataSet<Long> minInDegreeVertex = graph.inDegrees().minBy(1).map(new ProjectVertexId()); + + /** find the vertex with the maximum out-degree **/ + DataSet<Long> maxOutDegreeVertex = graph.outDegrees().maxBy(1).map(new ProjectVertexId()); + + /** find the vertex with the minimum out-degree **/ + DataSet<Long> minOutDegreeVertex = graph.outDegrees().minBy(1).map(new ProjectVertexId()); + + /** print the results **/ + ExampleUtils.printResult(env.fromElements(numVertices), "Total number of vertices"); + ExampleUtils.printResult(env.fromElements(numEdges), "Total number of edges"); + ExampleUtils.printResult(avgNodeDegree, "Average node degree"); + ExampleUtils.printResult(maxInDegreeVertex, "Vertex with Max in-degree"); + ExampleUtils.printResult(minInDegreeVertex, "Vertex with Min in-degree"); + ExampleUtils.printResult(maxOutDegreeVertex, "Vertex with Max out-degree"); + ExampleUtils.printResult(minOutDegreeVertex, "Vertex with Min out-degree"); + + env.execute(); + } + + @SuppressWarnings("serial") + private static final class AvgNodeDegreeMapper implements MapFunction<Tuple2<Long, Long>, Double> { + + private long numberOfVertices; + + public AvgNodeDegreeMapper(long numberOfVertices) { + this.numberOfVertices = numberOfVertices; + } + + public Double map(Tuple2<Long, Long> sumTuple) { + return (double) (sumTuple.f1 / numberOfVertices) ; + } + } + + @SuppressWarnings("serial") + private static final class ProjectVertexId implements MapFunction<Tuple2<Long,Long>, Long> { + public Long map(Tuple2<Long, Long> value) { return value.f0; } + } + + @Override + public String getDescription() { + return "Graph Metrics Example"; + } + + // ****************************************************************************************************************** + // UTIL METHODS + // ****************************************************************************************************************** + + private static boolean fileOutput = false; + + private static String edgesInputPath = null; + + static final int NUM_VERTICES = 100; + + static final long SEED = 9876; + + private static boolean parseParameters(String[] args) { + + if(args.length > 0) { + if(args.length != 1) { + System.err.println("Usage: GraphMetrics <input edges>"); + return false; + } + + fileOutput = true; + edgesInputPath = args[0]; + } else { + System.out.println("Executing Graph Metrics example with default parameters and built-in default data."); + System.out.println(" Provide parameters to read input data from files."); + System.out.println(" See the documentation for the correct format of input files."); + System.out.println("Usage: GraphMetrics <input edges>"); + } + return true; + } + + @SuppressWarnings("serial") + private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) { + if (fileOutput) { + return env.readCsvFile(edgesInputPath) + .lineDelimiter("\n").fieldDelimiter("\t") + .types(Long.class, Long.class).map( + new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() { + + public Edge<Long, NullValue> map(Tuple2<Long, Long> value) { + return new Edge<Long, NullValue>(value.f0, value.f1, + NullValue.getInstance()); + } + }); + } else { + return ExampleUtils.getRandomEdges(env, NUM_VERTICES); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/IncrementalSSSP.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/IncrementalSSSP.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/IncrementalSSSP.java new file mode 100644 index 0000000..26e419f --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/IncrementalSSSP.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.examples.data.IncrementalSSSPData; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.ScatterGatherConfiguration; +import org.apache.flink.graph.spargel.VertexUpdateFunction; + +/** + * This example illustrates how to + * <ul> + * <li> create a Graph directly from CSV files + * <li> use the scatter-gather iteration's messaging direction configuration option + * </ul> + * + * Incremental Single Sink Shortest Paths Example. Shortest Paths are incrementally updated + * upon edge removal. + * + * The program takes as input the resulted graph after a SSSP computation, + * an edge to be removed and the initial graph(i.e. before SSSP was computed). + * In the following description, SP-graph is used as an abbreviation for + * the graph resulted from the SSSP computation. We denote the edges that belong to this + * graph by SP-edges. + * + * - If the removed edge does not belong to the SP-graph, no computation is necessary. + * The edge is simply removed from the graph. + * - If the removed edge is an SP-edge, then all nodes, whose shortest path contains the removed edge, + * potentially require re-computation. + * When the edge {@code <u, v>} is removed, v checks if it has another out-going SP-edge. + * If yes, no further computation is required. + * If v has no other out-going SP-edge, it invalidates its current value, by setting it to INF. + * Then, it informs all its SP-in-neighbors by sending them an INVALIDATE message. + * When a vertex u receives an INVALIDATE message from v, it checks whether it has another out-going SP-edge. + * If not, it invalidates its current value and propagates the INVALIDATE message. + * The propagation stops when a vertex with an alternative shortest path is reached + * or when we reach a vertex with no SP-in-neighbors. + * + * Usage <code>IncrementalSSSP <vertex path> <edge path> <edges in SSSP> + * <src id edge to be removed> <trg id edge to be removed> <val edge to be removed> + * <result path> <number of iterations></code><br> + * If no parameters are provided, the program is run with default data from + * {@link IncrementalSSSPData} + */ +@SuppressWarnings("serial") +public class IncrementalSSSP implements ProgramDescription { + + public static void main(String [] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Edge<Long, Double> edgeToBeRemoved = getEdgeToBeRemoved(); + + Graph<Long, Double, Double> graph = IncrementalSSSP.getGraph(env); + + // Assumption: all minimum weight paths are kept + Graph<Long, Double, Double> ssspGraph = IncrementalSSSP.getSSSPGraph(env); + + // remove the edge + graph.removeEdge(edgeToBeRemoved); + + // configure the iteration + ScatterGatherConfiguration parameters = new ScatterGatherConfiguration(); + + if(isInSSSP(edgeToBeRemoved, ssspGraph.getEdges())) { + + parameters.setDirection(EdgeDirection.IN); + parameters.setOptDegrees(true); + + // run the scatter-gather iteration to propagate info + Graph<Long, Double, Double> result = ssspGraph.runScatterGatherIteration(new VertexDistanceUpdater(), + new InvalidateMessenger(edgeToBeRemoved), maxIterations, parameters); + + DataSet<Vertex<Long, Double>> resultedVertices = result.getVertices(); + + // Emit results + if(fileOutput) { + resultedVertices.writeAsCsv(outputPath, "\n", ","); + env.execute("Incremental SSSP Example"); + } else { + resultedVertices.print(); + } + } else { + // print the vertices + if(fileOutput) { + graph.getVertices().writeAsCsv(outputPath, "\n", ","); + env.execute("Incremental SSSP Example"); + } else { + graph.getVertices().print(); + } + } + } + + @Override + public String getDescription() { + return "Incremental Single Sink Shortest Paths Example"; + } + + // ****************************************************************************************************************** + // IncrementalSSSP METHODS + // ****************************************************************************************************************** + + /** + * Function that verifies whether the edge to be removed is part of the SSSP or not. + * If it is, the src vertex will be invalidated. + * + * @param edgeToBeRemoved + * @param edgesInSSSP + * @return true or false + */ + public static boolean isInSSSP(final Edge<Long, Double> edgeToBeRemoved, DataSet<Edge<Long, Double>> edgesInSSSP) throws Exception { + + return edgesInSSSP.filter(new FilterFunction<Edge<Long, Double>>() { + @Override + public boolean filter(Edge<Long, Double> edge) throws Exception { + return edge.equals(edgeToBeRemoved); + } + }).count() > 0; + } + + public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> { + + @Override + public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) throws Exception { + if (inMessages.hasNext()) { + Long outDegree = getOutDegree() - 1; + // check if the vertex has another SP-Edge + if (outDegree <= 0) { + // set own value to infinity + setNewVertexValue(Double.MAX_VALUE); + } + } + } + } + + public static final class InvalidateMessenger extends MessagingFunction<Long, Double, Double, Double> { + + private Edge<Long, Double> edgeToBeRemoved; + + public InvalidateMessenger(Edge<Long, Double> edgeToBeRemoved) { + this.edgeToBeRemoved = edgeToBeRemoved; + } + + @Override + public void sendMessages(Vertex<Long, Double> vertex) throws Exception { + + + if(getSuperstepNumber() == 1) { + if(vertex.getId().equals(edgeToBeRemoved.getSource())) { + // activate the edge target + sendMessageTo(edgeToBeRemoved.getSource(), Double.MAX_VALUE); + } + } + + if(getSuperstepNumber() > 1) { + // invalidate all edges + for(Edge<Long, Double> edge : getEdges()) { + sendMessageTo(edge.getSource(), Double.MAX_VALUE); + } + } + } + } + + // ****************************************************************************************************************** + // UTIL METHODS + // ****************************************************************************************************************** + + private static boolean fileOutput = false; + + private static String verticesInputPath = null; + + private static String edgesInputPath = null; + + private static String edgesInSSSPInputPath = null; + + private static Long srcEdgeToBeRemoved = null; + + private static Long trgEdgeToBeRemoved = null; + + private static Double valEdgeToBeRemoved = null; + + private static String outputPath = null; + + private static int maxIterations = 5; + + private static boolean parseParameters(String[] args) { + if (args.length > 0) { + if (args.length == 8) { + fileOutput = true; + verticesInputPath = args[0]; + edgesInputPath = args[1]; + edgesInSSSPInputPath = args[2]; + srcEdgeToBeRemoved = Long.parseLong(args[3]); + trgEdgeToBeRemoved = Long.parseLong(args[4]); + valEdgeToBeRemoved = Double.parseDouble(args[5]); + outputPath = args[6]; + maxIterations = Integer.parseInt(args[7]); + } else { + System.out.println("Executing IncrementalSSSP example with default parameters and built-in default data."); + System.out.println("Provide parameters to read input data from files."); + System.out.println("See the documentation for the correct format of input files."); + System.out.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> " + + "<src id edge to be removed> <trg id edge to be removed> <val edge to be removed> " + + "<output path> <max iterations>"); + + return false; + } + } + return true; + } + + private static Graph<Long, Double, Double> getGraph(ExecutionEnvironment env) { + if(fileOutput) { + return Graph.fromCsvReader(verticesInputPath, edgesInputPath, env).lineDelimiterEdges("\n") + .types(Long.class, Double.class, Double.class); + } else { + return Graph.fromDataSet(IncrementalSSSPData.getDefaultVertexDataSet(env), IncrementalSSSPData.getDefaultEdgeDataSet(env), env); + } + } + + private static Graph<Long, Double, Double> getSSSPGraph(ExecutionEnvironment env) { + if(fileOutput) { + return Graph.fromCsvReader(verticesInputPath, edgesInSSSPInputPath, env).lineDelimiterEdges("\n") + .types(Long.class, Double.class, Double.class); + } else { + return Graph.fromDataSet(IncrementalSSSPData.getDefaultVertexDataSet(env), IncrementalSSSPData.getDefaultEdgesInSSSP(env), env); + } + } + + private static Edge<Long, Double> getEdgeToBeRemoved() { + if (fileOutput) { + return new Edge<Long, Double>(srcEdgeToBeRemoved, trgEdgeToBeRemoved, valEdgeToBeRemoved); + } else { + return IncrementalSSSPData.getDefaultEdgeToBeRemoved(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardSimilarityMeasure.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardSimilarityMeasure.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardSimilarityMeasure.java new file mode 100644 index 0000000..fbd735b --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardSimilarityMeasure.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.ReduceNeighborsFunction; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.Triplet; +import org.apache.flink.graph.VertexJoinFunction; +import org.apache.flink.graph.examples.data.JaccardSimilarityMeasureData; + +import java.util.HashSet; + +/** + * This example shows how to use + * <ul> + * <li> neighborhood methods + * <li> join with vertices + * <li> triplets + * </ul> + * + * Given a directed, unweighted graph, return a weighted graph where the edge values are equal + * to the Jaccard similarity coefficient - the number of common neighbors divided by the the size + * of the union of neighbor sets - for the src and target vertices. + * + * <p> + * Input files are plain text files and must be formatted as follows: + * <br> + * Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs. + * Edges themselves are separated by newlines. + * For example: <code>1 2\n1 3\n</code> defines two edges 1-2 and 1-3. + * </p> + * + * Usage <code> JaccardSimilarityMeasure <edge path> <result path></code><br> + * If no parameters are provided, the program is run with default data from + * {@link JaccardSimilarityMeasureData} + */ +@SuppressWarnings("serial") +public class JaccardSimilarityMeasure implements ProgramDescription { + + public static void main(String [] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env); + + Graph<Long, HashSet<Long>, Double> graph = Graph.fromDataSet(edges, + new MapFunction<Long, HashSet<Long>>() { + + @Override + public HashSet<Long> map(Long id) throws Exception { + HashSet<Long> neighbors = new HashSet<Long>(); + neighbors.add(id); + + return new HashSet<Long>(neighbors); + } + }, env); + + // create the set of neighbors + DataSet<Tuple2<Long, HashSet<Long>>> computedNeighbors = + graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL); + + // join with the vertices to update the node values + Graph<Long, HashSet<Long>, Double> graphWithVertexValues = + graph.joinWithVertices(computedNeighbors, new VertexJoinFunction<HashSet<Long>, + HashSet<Long>>() { + + public HashSet<Long> vertexJoin(HashSet<Long> vertexValue, HashSet<Long> inputValue) { + return inputValue; + } + }); + + // compare neighbors, compute Jaccard + DataSet<Edge<Long, Double>> edgesWithJaccardValues = + graphWithVertexValues.getTriplets().map(new ComputeJaccard()); + + // emit result + if (fileOutput) { + edgesWithJaccardValues.writeAsCsv(outputPath, "\n", ","); + + // since file sinks are lazy, we trigger the execution explicitly + env.execute("Executing Jaccard Similarity Measure"); + } else { + edgesWithJaccardValues.print(); + } + + } + + @Override + public String getDescription() { + return "Vertex Jaccard Similarity Measure"; + } + + /** + * Each vertex will have a HashSet containing its neighbor ids as value. + */ + private static final class GatherNeighbors implements ReduceNeighborsFunction<HashSet<Long>> { + + @Override + public HashSet<Long> reduceNeighbors(HashSet<Long> first, HashSet<Long> second) { + first.addAll(second); + return new HashSet<Long>(first); + } + } + + /** + * The edge weight will be the Jaccard coefficient, which is computed as follows: + * + * Consider the edge x-y + * We denote by sizeX and sizeY, the neighbors hash set size of x and y respectively. + * sizeX+sizeY = union + intersection of neighborhoods + * size(hashSetX.addAll(hashSetY)).distinct = union of neighborhoods + * The intersection can then be deduced. + * + * The Jaccard similarity coefficient is then, the intersection/union. + */ + private static final class ComputeJaccard implements + MapFunction<Triplet<Long, HashSet<Long>, Double>, Edge<Long, Double>> { + + @Override + public Edge<Long, Double> map(Triplet<Long, HashSet<Long>, Double> triplet) throws Exception { + + Vertex<Long, HashSet<Long>> srcVertex = triplet.getSrcVertex(); + Vertex<Long, HashSet<Long>> trgVertex = triplet.getTrgVertex(); + + Long x = srcVertex.getId(); + Long y = trgVertex.getId(); + HashSet<Long> neighborSetY = trgVertex.getValue(); + + double unionPlusIntersection = srcVertex.getValue().size() + neighborSetY.size(); + // within a HashSet, all elements are distinct + HashSet<Long> unionSet = new HashSet<Long>(); + unionSet.addAll(srcVertex.getValue()); + unionSet.addAll(neighborSetY); + double union = unionSet.size(); + double intersection = unionPlusIntersection - union; + + return new Edge<Long, Double>(x, y, intersection/union); + } + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String edgeInputPath = null; + private static String outputPath = null; + + private static boolean parseParameters(String [] args) { + if(args.length > 0) { + if(args.length != 2) { + System.err.println("Usage JaccardSimilarityMeasure <edge path> <output path>"); + return false; + } + + fileOutput = true; + edgeInputPath = args[0]; + outputPath = args[1]; + } else { + System.out.println("Executing JaccardSimilarityMeasure example with default parameters and built-in default data."); + System.out.println("Provide parameters to read input data from files."); + System.out.println("Usage JaccardSimilarityMeasure <edge path> <output path>"); + } + + return true; + } + + private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) { + + if(fileOutput) { + return env.readCsvFile(edgeInputPath) + .ignoreComments("#") + .fieldDelimiter("\t") + .lineDelimiter("\n") + .types(Long.class, Long.class) + .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() { + @Override + public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception { + return new Edge<Long, Double>(tuple2.f0, tuple2.f1, new Double(0)); + } + }); + } else { + return JaccardSimilarityMeasureData.getDefaultEdgeDataSet(env); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/MusicProfiles.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/MusicProfiles.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/MusicProfiles.java new file mode 100644 index 0000000..b7b590d --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/MusicProfiles.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.EdgesFunctionWithVertexValue; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.VertexJoinFunction; +import org.apache.flink.graph.examples.data.MusicProfilesData; +import org.apache.flink.graph.library.LabelPropagation; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; + +/** + * This example demonstrates how to mix the DataSet Flink API with the Gelly API. + * The input is a set <userId - songId - playCount> triplets and + * a set of bad records, i.e. song ids that should not be trusted. + * Initially, we use the DataSet API to filter out the bad records. + * Then, we use Gelly to create a user -> song weighted bipartite graph and compute + * the top song (most listened) per user. + * Then, we use the DataSet API again, to create a user-user similarity graph, + * based on common songs, where users that are listeners of the same song + * are connected. A user-defined threshold on the playcount value + * defines when a user is considered to be a listener of a song. + * Finally, we use the graph API to run the label propagation community detection algorithm on + * the similarity graph. + * + * The triplets input is expected to be given as one triplet per line, + * in the following format: "<userID>\t<songID>\t<playcount>". + * + * The mismatches input file is expected to contain one mismatch record per line, + * in the following format: + * "ERROR: <songID trackID> song_title" + * + * If no arguments are provided, the example runs with default data from {@link MusicProfilesData}. + */ +@SuppressWarnings("serial") +public class MusicProfiles implements ProgramDescription { + + public static void main(String[] args) throws Exception { + + if (!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + /** + * Read the user-song-play triplets. + */ + DataSet<Tuple3<String, String, Integer>> triplets = getUserSongTripletsData(env); + + /** + * Read the mismatches dataset and extract the songIDs + */ + DataSet<Tuple1<String>> mismatches = getMismatchesData(env).map(new ExtractMismatchSongIds()); + + /** + * Filter out the mismatches from the triplets dataset + */ + DataSet<Tuple3<String, String, Integer>> validTriplets = triplets + .coGroup(mismatches).where(1).equalTo(0) + .with(new FilterOutMismatches()); + + /** + * Create a user -> song weighted bipartite graph where the edge weights + * correspond to play counts + */ + Graph<String, NullValue, Integer> userSongGraph = Graph.fromTupleDataSet(validTriplets, env); + + /** + * Get the top track (most listened) for each user + */ + DataSet<Tuple2<String, String>> usersWithTopTrack = userSongGraph + .groupReduceOnEdges(new GetTopSongPerUser(), EdgeDirection.OUT) + .filter(new FilterSongNodes()); + + if (fileOutput) { + usersWithTopTrack.writeAsCsv(topTracksOutputPath, "\n", "\t"); + } else { + usersWithTopTrack.print(); + } + + /** + * Create a user-user similarity graph, based on common songs, i.e. two + * users that listen to the same song are connected. For each song, we + * create an edge between each pair of its in-neighbors. + */ + DataSet<Edge<String, NullValue>> similarUsers = userSongGraph + .getEdges() + // filter out user-song edges that are below the playcount threshold + .filter(new FilterFunction<Edge<String, Integer>>() { + public boolean filter(Edge<String, Integer> edge) { + return (edge.getValue() > playcountThreshold); + } + }).groupBy(1) + .reduceGroup(new CreateSimilarUserEdges()).distinct(); + + Graph<String, Long, NullValue> similarUsersGraph = Graph.fromDataSet(similarUsers, + new MapFunction<String, Long>() { + public Long map(String value) { + return 1l; + } + }, env).getUndirected(); + + /** + * Detect user communities using the label propagation library method + */ + // Initialize each vertex with a unique numeric label and run the label propagation algorithm + DataSet<Tuple2<String, Long>> idsWithInitialLabels = DataSetUtils + .zipWithUniqueId(similarUsersGraph.getVertexIds()) + .map(new MapFunction<Tuple2<Long, String>, Tuple2<String, Long>>() { + @Override + public Tuple2<String, Long> map(Tuple2<Long, String> tuple2) throws Exception { + return new Tuple2<String, Long>(tuple2.f1, tuple2.f0); + } + }); + + DataSet<Vertex<String, Long>> verticesWithCommunity = similarUsersGraph + .joinWithVertices(idsWithInitialLabels, + new VertexJoinFunction<Long, Long>() { + public Long vertexJoin(Long vertexValue, Long inputValue) { + return inputValue; + } + }).run(new LabelPropagation<String, Long, NullValue>(maxIterations)); + + if (fileOutput) { + verticesWithCommunity.writeAsCsv(communitiesOutputPath, "\n", "\t"); + + // since file sinks are lazy, we trigger the execution explicitly + env.execute(); + } else { + verticesWithCommunity.print(); + } + + } + + public static final class ExtractMismatchSongIds implements MapFunction<String, Tuple1<String>> { + + public Tuple1<String> map(String value) { + String[] tokens = value.split("\\s+"); + String songId = tokens[1].substring(1); + return new Tuple1<String>(songId); + } + } + + public static final class FilterOutMismatches implements CoGroupFunction<Tuple3<String, String, Integer>, + Tuple1<String>, Tuple3<String, String, Integer>> { + + public void coGroup(Iterable<Tuple3<String, String, Integer>> triplets, + Iterable<Tuple1<String>> invalidSongs, Collector<Tuple3<String, String, Integer>> out) { + + if (!invalidSongs.iterator().hasNext()) { + // this is a valid triplet + for (Tuple3<String, String, Integer> triplet : triplets) { + out.collect(triplet); + } + } + } + } + + public static final class FilterSongNodes implements FilterFunction<Tuple2<String, String>> { + public boolean filter(Tuple2<String, String> value) throws Exception { + return !value.f1.equals(""); + } + } + + public static final class GetTopSongPerUser implements EdgesFunctionWithVertexValue<String, NullValue, Integer, + Tuple2<String, String>> { + + public void iterateEdges(Vertex<String, NullValue> vertex, + Iterable<Edge<String, Integer>> edges, Collector<Tuple2<String, String>> out) throws Exception { + + int maxPlaycount = 0; + String topSong = ""; + for (Edge<String, Integer> edge : edges) { + if (edge.getValue() > maxPlaycount) { + maxPlaycount = edge.getValue(); + topSong = edge.getTarget(); + } + } + out.collect(new Tuple2<String, String>(vertex.getId(), topSong)); + } + } + + public static final class CreateSimilarUserEdges implements GroupReduceFunction<Edge<String, Integer>, + Edge<String, NullValue>> { + + public void reduce(Iterable<Edge<String, Integer>> edges, Collector<Edge<String, NullValue>> out) { + List<String> listeners = new ArrayList<String>(); + for (Edge<String, Integer> edge : edges) { + listeners.add(edge.getSource()); + } + for (int i = 0; i < listeners.size() - 1; i++) { + for (int j = i + 1; j < listeners.size(); j++) { + out.collect(new Edge<String, NullValue>(listeners.get(i), + listeners.get(j), NullValue.getInstance())); + } + } + } + } + + @Override + public String getDescription() { + return "Music Profiles Example"; + } + + // ****************************************************************************************************************** + // UTIL METHODS + // ****************************************************************************************************************** + + private static boolean fileOutput = false; + + private static String userSongTripletsInputPath = null; + + private static String mismatchesInputPath = null; + + private static String topTracksOutputPath = null; + + private static int playcountThreshold = 0; + + private static String communitiesOutputPath = null; + + private static int maxIterations = 10; + + private static boolean parseParameters(String[] args) { + + if(args.length > 0) { + if(args.length != 6) { + System.err.println("Usage: MusicProfiles <input user song triplets path>" + + " <input song mismatches path> <output top tracks path> " + + "<playcount threshold> <output communities path> <num iterations>"); + return false; + } + + fileOutput = true; + userSongTripletsInputPath = args[0]; + mismatchesInputPath = args[1]; + topTracksOutputPath = args[2]; + playcountThreshold = Integer.parseInt(args[3]); + communitiesOutputPath = args[4]; + maxIterations = Integer.parseInt(args[5]); + } else { + System.out.println("Executing Music Profiles example with default parameters and built-in default data."); + System.out.println(" Provide parameters to read input data from files."); + System.out.println(" See the documentation for the correct format of input files."); + System.out.println("Usage: MusicProfiles <input user song triplets path>" + + " <input song mismatches path> <output top tracks path> " + + "<playcount threshold> <output communities path> <num iterations>"); + } + return true; + } + + private static DataSet<Tuple3<String, String, Integer>> getUserSongTripletsData(ExecutionEnvironment env) { + if (fileOutput) { + return env.readCsvFile(userSongTripletsInputPath) + .lineDelimiter("\n").fieldDelimiter("\t") + .types(String.class, String.class, Integer.class); + } else { + return MusicProfilesData.getUserSongTriplets(env); + } + } + + private static DataSet<String> getMismatchesData(ExecutionEnvironment env) { + if (fileOutput) { + return env.readTextFile(mismatchesInputPath); + } else { + return MusicProfilesData.getMismatches(env); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java new file mode 100644 index 0000000..c9abf02 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples; + +import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData; +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.graph.utils.Tuple3ToEdgeMap; + +/** + * This example shows how to use Gelly's scatter-gather iterations. + * + * It is an implementation of the Single-Source-Shortest-Paths algorithm. + * For a gather-sum-apply implementation of the same algorithm, please refer to {@link GSASingleSourceShortestPaths}. + * + * The input file is a plain text file and must be formatted as follows: + * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are + * separated by tabs. Edges themselves are separated by newlines. + * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges, + * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4. + * + * If no parameters are provided, the program is run with default data from + * {@link SingleSourceShortestPathsData} + */ +public class SingleSourceShortestPaths implements ProgramDescription { + + public static void main(String[] args) throws Exception { + + if (!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env); + + Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env); + + // Execute the scatter-gather iteration + Graph<Long, Double, Double> result = graph.runScatterGatherIteration( + new VertexDistanceUpdater(), new MinDistanceMessenger(), maxIterations); + + // Extract the vertices as the result + DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices(); + + // emit result + if (fileOutput) { + singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ","); + + // since file sinks are lazy, we trigger the execution explicitly + env.execute("Single Source Shortest Paths Example"); + } else { + singleSourceShortestPaths.print(); + } + + } + + // -------------------------------------------------------------------------------------------- + // Single Source Shortest Path UDFs + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("serial") + private static final class InitVertices implements MapFunction<Long, Double>{ + + private long srcId; + + public InitVertices(long srcId) { + this.srcId = srcId; + } + + public Double map(Long id) { + if (id.equals(srcId)) { + return 0.0; + } + else { + return Double.POSITIVE_INFINITY; + } + } + } + + /** + * Function that updates the value of a vertex by picking the minimum + * distance from all incoming messages. + */ + @SuppressWarnings("serial") + public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> { + + @Override + public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) { + + Double minDistance = Double.MAX_VALUE; + + for (double msg : inMessages) { + if (msg < minDistance) { + minDistance = msg; + } + } + + if (vertex.getValue() > minDistance) { + setNewVertexValue(minDistance); + } + } + } + + /** + * Distributes the minimum distance associated with a given vertex among all + * the target vertices summed up with the edge's value. + */ + @SuppressWarnings("serial") + public static final class MinDistanceMessenger extends MessagingFunction<Long, Double, Double, Double> { + + @Override + public void sendMessages(Vertex<Long, Double> vertex) { + if (vertex.getValue() < Double.POSITIVE_INFINITY) { + for (Edge<Long, Double> edge : getEdges()) { + sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue()); + } + } + } + } + + // ****************************************************************************************************************** + // UTIL METHODS + // ****************************************************************************************************************** + + private static boolean fileOutput = false; + + private static Long srcVertexId = 1l; + + private static String edgesInputPath = null; + + private static String outputPath = null; + + private static int maxIterations = 5; + + private static boolean parseParameters(String[] args) { + + if(args.length > 0) { + if(args.length != 4) { + System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" + + " <input edges path> <output path> <num iterations>"); + return false; + } + + fileOutput = true; + srcVertexId = Long.parseLong(args[0]); + edgesInputPath = args[1]; + outputPath = args[2]; + maxIterations = Integer.parseInt(args[3]); + } else { + System.out.println("Executing Single Source Shortest Paths example " + + "with default parameters and built-in default data."); + System.out.println(" Provide parameters to read input data from files."); + System.out.println(" See the documentation for the correct format of input files."); + System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" + + " <input edges path> <output path> <num iterations>"); + } + return true; + } + + private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) { + if (fileOutput) { + return env.readCsvFile(edgesInputPath) + .lineDelimiter("\n") + .fieldDelimiter("\t") + .types(Long.class, Long.class, Double.class) + .map(new Tuple3ToEdgeMap<Long, Double>()); + } else { + return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env); + } + } + + @Override + public String getDescription() { + return "Scatter-gather Single Source Shortest Paths"; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/CommunityDetectionData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/CommunityDetectionData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/CommunityDetectionData.java new file mode 100644 index 0000000..d3ddfd8 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/CommunityDetectionData.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples.data; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; + +import java.util.ArrayList; +import java.util.List; + +/** + * Provides the default data set used for the Simple Community Detection test program. + * If no parameters are given to the program, the default edge data set is used. + */ +public class CommunityDetectionData { + + // the algorithm is not guaranteed to always converge + public static final Integer MAX_ITERATIONS = 30; + + public static final double DELTA = 0.5f; + + public static final String COMMUNITIES_SINGLE_ITERATION = "1,5\n" + "2,6\n" + + "3,1\n" + "4,1\n" + "5,1\n" + "6,8\n" + "7,8\n" + "8,7"; + + public static final String COMMUNITIES_WITH_TIE = "1,2\n" + "2,1\n" + "3,1\n" + "4,1\n" + "5,1"; + + public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) { + + List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>(); + edges.add(new Edge<Long, Double>(1L, 2L, 1.0)); + edges.add(new Edge<Long, Double>(1L, 3L, 2.0)); + edges.add(new Edge<Long, Double>(1L, 4L, 3.0)); + edges.add(new Edge<Long, Double>(2L, 3L, 4.0)); + edges.add(new Edge<Long, Double>(2L, 4L, 5.0)); + edges.add(new Edge<Long, Double>(3L, 5L, 6.0)); + edges.add(new Edge<Long, Double>(5L, 6L, 7.0)); + edges.add(new Edge<Long, Double>(5L, 7L, 8.0)); + edges.add(new Edge<Long, Double>(6L, 7L, 9.0)); + edges.add(new Edge<Long, Double>(7L, 12L, 10.0)); + edges.add(new Edge<Long, Double>(8L, 9L, 11.0)); + edges.add(new Edge<Long, Double>(8L, 10L, 12.0)); + edges.add(new Edge<Long, Double>(8L, 11L, 13.0)); + edges.add(new Edge<Long, Double>(9L, 10L, 14.0)); + edges.add(new Edge<Long, Double>(9L, 11L, 15.0)); + edges.add(new Edge<Long, Double>(10L, 11L, 16.0)); + edges.add(new Edge<Long, Double>(10L, 12L, 17.0)); + edges.add(new Edge<Long, Double>(11L, 12L, 18.0)); + + return env.fromCollection(edges); + } + + public static DataSet<Edge<Long, Double>> getSimpleEdgeDataSet(ExecutionEnvironment env) { + + List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>(); + edges.add(new Edge<Long, Double>(1L, 2L, 1.0)); + edges.add(new Edge<Long, Double>(1L, 3L, 2.0)); + edges.add(new Edge<Long, Double>(1L, 4L, 3.0)); + edges.add(new Edge<Long, Double>(1L, 5L, 4.0)); + edges.add(new Edge<Long, Double>(2L, 6L, 5.0)); + edges.add(new Edge<Long, Double>(6L, 7L, 6.0)); + edges.add(new Edge<Long, Double>(6L, 8L, 7.0)); + edges.add(new Edge<Long, Double>(7L, 8L, 8.0)); + + return env.fromCollection(edges); + } + + private CommunityDetectionData() {} + + public static DataSet<Edge<Long, Double>> getTieEdgeDataSet(ExecutionEnvironment env) { + List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>(); + edges.add(new Edge<Long, Double>(1L, 2L, 1.0)); + edges.add(new Edge<Long, Double>(1L, 3L, 1.0)); + edges.add(new Edge<Long, Double>(1L, 4L, 1.0)); + edges.add(new Edge<Long, Double>(1L, 5L, 1.0)); + + return env.fromCollection(edges); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/ConnectedComponentsDefaultData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/ConnectedComponentsDefaultData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/ConnectedComponentsDefaultData.java new file mode 100644 index 0000000..c53f5ba --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/ConnectedComponentsDefaultData.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples.data; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.types.NullValue; + +import java.util.LinkedList; +import java.util.List; + +/** + * Provides the default data sets used for the connected components example program. + * If no parameters are given to the program, the default data sets are used. + */ +public class ConnectedComponentsDefaultData { + + public static final Integer MAX_ITERATIONS = 4; + + public static final String EDGES = "1 2\n" + "2 3\n" + "2 4\n" + "3 4"; + + public static final Object[][] DEFAULT_EDGES = new Object[][] { + new Object[]{1L, 2L}, + new Object[]{2L, 3L}, + new Object[]{2L, 4L}, + new Object[]{3L, 4L} + }; + + public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) { + List<Edge<Long, NullValue>> edgeList = new LinkedList<Edge<Long, NullValue>>(); + for (Object[] edge : DEFAULT_EDGES) { + edgeList.add(new Edge<Long, NullValue>((Long) edge[0], (Long) edge[1], NullValue.getInstance())); + } + return env.fromCollection(edgeList); + } + + public static final String VERTICES_WITH_MIN_ID = "1,1\n" + "2,1\n" + "3,1\n" + "4,1"; + + private ConnectedComponentsDefaultData() {} +}
