Repository: flink Updated Branches: refs/heads/master a9fc71d85 -> c71675f7c
[FLINK-3768] [gelly] Local Clustering Coefficient The local clustering coefficient measures the connectedness of each vertex's neighborhood. Scores range from 0.0 (no edges between neighbors) to 1.0 (neighborhood is a clique). This closes #1896 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c71675f7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c71675f7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c71675f7 Branch: refs/heads/master Commit: c71675f7cbe7e538d62bf1491aff69b369eda9eb Parents: a9fc71d Author: Greg Hogan <[email protected]> Authored: Mon May 9 14:42:56 2016 -0400 Committer: Greg Hogan <[email protected]> Committed: Mon May 16 14:03:28 2016 -0400 ---------------------------------------------------------------------- docs/apis/batch/libs/gelly.md | 18 ++ .../examples/LocalClusteringCoefficient.java | 135 +++++++++ .../flink/graph/examples/TriangleListing.java | 125 ++++++++ .../undirected/LocalClusteringCoefficient.java | 252 ++++++++++++++++ .../clustering/undirected/TriangleListing.java | 302 +++++++++++++++++++ .../apache/flink/graph/utils/Murmur3_32.java | 107 +++++++ .../LocalClusteringCoefficientTest.java | 81 +++++ .../undirected/TriangleListingTest.java | 78 +++++ 8 files changed, 1098 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c71675f7/docs/apis/batch/libs/gelly.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md index c68001b..dd0b4c1 100644 --- a/docs/apis/batch/libs/gelly.md +++ b/docs/apis/batch/libs/gelly.md @@ -1831,6 +1831,7 @@ Gelly has a growing collection of graph algorithms for easily analyzing large-sc * [GSA Triangle Count](#gsa-triangle-count) * [Triangle Enumerator](#triangle-enumerator) * [Summarization](#summarization) +* [Local Clustering Coefficient](#local-clustering-coefficient) Gelly's library methods can be used by simply calling the `run()` method on the input graph: @@ -2050,6 +2051,23 @@ The algorithm takes a directed, vertex (and possibly edge) attributed graph as i vertex represents a group of vertices and each edge represents a group of edges from the input graph. Furthermore, each vertex and edge in the output graph stores the common group value and the number of represented elements. +### Local Clustering Coefficient + +#### Overview +The local clustering coefficient measures the connectedness of each vertex's neighborhood. Scores range from 0.0 (no +edges between neighbors) to 1.0 (neighborhood is a clique). + +#### Details +An edge between a vertex's neighbors is a triangle. Counting edges between neighbors is equivalent to counting the +number of triangles which include the vertex. The clustering coefficient score is the number of edges between neighbors +divided by the number of potential edges between neighbors. + +See the [Triangle Enumeration](#triangle-enumeration) library method for a detailed explanation of triangle enumeration. + +#### Usage +The algorithm takes a simple, undirected graph as input and outputs a `DataSet` of tuples containing the vertex ID, +vertex degree, and number of triangles containing the vertex. The vertex ID must be `Comparable` and `Copyable`. + {% top %} Graph Algorithms http://git-wip-us.apache.org/repos/asf/flink/blob/c71675f7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java new file mode 100644 index 0000000..2465da8 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples; + +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvOutputFormat; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.translate.LongValueToIntValue; +import org.apache.flink.graph.asm.translate.TranslateGraphIds; +import org.apache.flink.graph.generator.RMatGraph; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +import java.text.NumberFormat; + +/** + * Driver for the library implementation of Local Clustering Coefficient. + * + * This example generates an undirected RMat graph with the given scale and + * edge factor then calculates the local clustering coefficient for each vertex. + * + * @see org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient + */ +public class LocalClusteringCoefficient { + + public static final int DEFAULT_SCALE = 10; + + public static final int DEFAULT_EDGE_FACTOR = 16; + + public static final boolean DEFAULT_CLIP_AND_FLIP = true; + + public static void main(String[] args) throws Exception { + // Set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableObjectReuse(); + + ParameterTool parameters = ParameterTool.fromArgs(args); + + // Generate RMat graph + int scale = parameters.getInt("scale", DEFAULT_SCALE); + int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); + + RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory(); + + long vertexCount = 1 << scale; + long edgeCount = vertexCount * edgeFactor; + + boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); + + Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) + .setSimpleGraph(true, clipAndFlip) + .generate(); + + DataSet cc; + + if (scale > 32) { + cc = graph + .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()); + } else { + cc = graph + .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue())) + .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()); + } + + switch (parameters.get("output", "")) { + case "print": + for (Object e: cc.collect()) { + Result result = (Result)e; + System.out.println(result.toVerboseString()); + } + break; + + case "hash": + System.out.println(DataSetUtils.checksumHashCode(cc)); + break; + + case "csv": + String filename = parameters.get("filename"); + + String row_delimiter = parameters.get("row_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER); + String field_delimiter = parameters.get("field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER); + + cc.writeAsCsv(filename, row_delimiter, field_delimiter); + + env.execute(); + break; + default: + System.out.println("The local clustering coefficient measures the connectedness of each vertex's"); + System.out.println("neighborhood. Scores range from 0.0 (no edges between neighbors) to 1.0"); + System.out.println("(neighborhood is a clique)"); + System.out.println(""); + System.out.println("This algorithm returns tuples containing the vertex ID, the degree of"); + System.out.println("the vertex, the number of edges between vertex neighbors, and the local"); + System.out.println("clustering coefficient."); + System.out.println(""); + System.out.println("usage:"); + System.out.println(" LocalClusteringCoefficient [--scale SCALE] [--edge_factor EDGE_FACTOR] --output print"); + System.out.println(" LocalClusteringCoefficient [--scale SCALE] [--edge_factor EDGE_FACTOR] --output hash"); + System.out.println(" LocalClusteringCoefficient [--scale SCALE] [--edge_factor EDGE_FACTOR] --output csv" + + " --filename FILENAME [--row_delimiter ROW_DELIMITER] [--field_delimiter FIELD_DELIMITER]"); + + return; + } + + JobExecutionResult result = env.getLastJobExecutionResult(); + + NumberFormat nf = NumberFormat.getInstance(); + System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c71675f7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java new file mode 100644 index 0000000..f5f232d --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples; + +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvOutputFormat; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.translate.LongValueToIntValue; +import org.apache.flink.graph.asm.translate.TranslateGraphIds; +import org.apache.flink.graph.generator.RMatGraph; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +import java.text.NumberFormat; + +/** + * Driver for the library implementation of Triangle Listing. + * + * This example generates an undirected RMat graph with the given scale + * and edge factor then lists all triangles. + * + * @see org.apache.flink.graph.library.clustering.undirected.TriangleListing + */ +public class TriangleListing { + + public static final int DEFAULT_SCALE = 10; + + public static final int DEFAULT_EDGE_FACTOR = 16; + + public static final boolean DEFAULT_CLIP_AND_FLIP = true; + + public static void main(String[] args) throws Exception { + // Set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableObjectReuse(); + + ParameterTool parameters = ParameterTool.fromArgs(args); + + // Generate RMat graph + int scale = parameters.getInt("scale", DEFAULT_SCALE); + int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); + + RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory(); + + long vertexCount = 1 << scale; + long edgeCount = vertexCount * edgeFactor; + + boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); + + Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) + .setSimpleGraph(true, clipAndFlip) + .generate(); + + DataSet tl; + + if (scale > 32) { + tl = graph + .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>()); + } else { + tl = graph + .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue())) + .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, NullValue, NullValue>()); + } + + switch (parameters.get("output", "")) { + case "print": + tl.print(); + break; + + case "hash": + System.out.println(DataSetUtils.checksumHashCode(tl)); + break; + + case "csv": + String filename = parameters.get("filename"); + + String row_delimiter = parameters.get("row_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER); + String field_delimiter = parameters.get("field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER); + + tl.writeAsCsv(filename, row_delimiter, field_delimiter); + + env.execute(); + break; + default: + System.out.println("Lists all distinct triangles in the generated RMat graph."); + System.out.println(""); + System.out.println("usage:"); + System.out.println(" TriangleListing [--scale SCALE] [--edge_factor EDGE_FACTOR] --output print"); + System.out.println(" TriangleListing [--scale SCALE] [--edge_factor EDGE_FACTOR] --output hash"); + System.out.println(" TriangleListing [--scale SCALE] [--edge_factor EDGE_FACTOR] --output csv" + + " --filename FILENAME [--row_delimiter ROW_DELIMITER] [--field_delimiter FIELD_DELIMITER]"); + + return; + } + + JobExecutionResult result = env.getLastJobExecutionResult(); + + NumberFormat nf = NumberFormat.getInstance(); + System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c71675f7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java new file mode 100644 index 0000000..0a562d5 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.clustering.undirected; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree; +import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result; +import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.Collector; + +/** + * The local clustering coefficient measures the connectedness of each vertex's + * neighborhood. Scores range from 0.0 (no edges between neighbors) to 1.0 + * (neighborhood is a clique). + * <br/> + * An edge between a vertex's neighbors is a triangle. Counting edges between + * neighbors is equivalent to counting the number of triangles which include + * the vertex. + * <br/> + * The input graph must be a simple, undirected graph containing no duplicate + * edges or self-loops. + * + * @param <K> graph ID type + * @param <VV> vertex value type + * @param <EV> edge value type + */ +public class LocalClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV> +implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> { + + // Optional configuration + private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + + /** + * Override the parallelism of operators processing small amounts of data. + * + * @param littleParallelism operator parallelism + * @return this + */ + public LocalClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) { + this.littleParallelism = littleParallelism; + + return this; + } + + /* + * Implementation notes: + * + * The requirement that "K extends CopyableValue<K>" can be removed when + * removed from TriangleListing. + * + * CountVertices can be replaced by ".sum(1)" when Flink aggregators use + * code generation. + */ + + @Override + public DataSet<Result<K>> run(Graph<K, VV, EV> input) + throws Exception { + // u, v, w + DataSet<Tuple3<K,K,K>> triangles = input + .run(new TriangleListing<K,VV,EV>() + .setSortTriangleVertices(false) + .setLittleParallelism(littleParallelism)); + + // u, 1 + DataSet<Tuple2<K, LongValue>> triangleVertices = triangles + .flatMap(new SplitTriangles<K>()) + .name("Split triangle vertices"); + + // u, triangle count + DataSet<Tuple2<K, LongValue>> vertexTriangleCount = triangleVertices + .groupBy(0) + .reduce(new CountVertices<K>()) + .name("Count triangles"); + + // u, deg(u) + DataSet<Vertex<K, LongValue>> vertexDegree = input + .run(new VertexDegree<K, VV, EV>() + .setParallelism(littleParallelism) + .setIncludeZeroDegreeVertices(true)); + + // u, deg(u), triangle count + return vertexDegree + .leftOuterJoin(vertexTriangleCount) + .where(0) + .equalTo(0) + .with(new JoinVertexDegreeWithTriangleCount<K>()) + .setParallelism(littleParallelism) + .name("Clustering coefficient"); + } + + /** + * Emits the three vertex IDs comprising each triangle along with an initial count. + * + * @param <T> ID type + */ + private static class SplitTriangles<T> + implements FlatMapFunction<Tuple3<T, T, T>, Tuple2<T, LongValue>> { + private Tuple2<T, LongValue> output = new Tuple2<>(null, new LongValue(1)); + + @Override + public void flatMap(Tuple3<T, T, T> value, Collector<Tuple2<T, LongValue>> out) + throws Exception { + output.f0 = value.f0; + out.collect(output); + + output.f0 = value.f1; + out.collect(output); + + output.f0 = value.f2; + out.collect(output); + } + } + + /** + * Combines the count of each vertex ID. + * + * @param <T> ID type + */ + @FunctionAnnotation.ForwardedFields("0") + private static class CountVertices<T> + implements ReduceFunction<Tuple2<T, LongValue>> { + @Override + public Tuple2<T, LongValue> reduce(Tuple2<T, LongValue> left, Tuple2<T, LongValue> right) + throws Exception { + left.f1.setValue(left.f1.getValue() + right.f1.getValue()); + return left; + } + } + + /** + * Joins the vertex and degree with the vertex's triangle count. + * + * @param <T> ID type + */ + @FunctionAnnotation.ForwardedFieldsFirst("0; 1->1.0") + @FunctionAnnotation.ForwardedFieldsSecond("0") + private static class JoinVertexDegreeWithTriangleCount<T> + implements JoinFunction<Vertex<T, LongValue>, Tuple2<T, LongValue>, Result<T>> { + private LongValue zero = new LongValue(0); + + private Result<T> output = new Result<>(); + + @Override + public Result<T> join(Vertex<T, LongValue> vertexAndDegree, Tuple2<T, LongValue> vertexAndTriangleCount) + throws Exception { + output.f0 = vertexAndDegree.f0; + output.f1.f0 = vertexAndDegree.f1; + output.f1.f1 = (vertexAndTriangleCount == null) ? zero : vertexAndTriangleCount.f1; + + return output; + } + } + + /** + * Wraps the vertex type to encapsulate results from the Clustering Coefficient algorithm. + * + * @param <T> ID type + */ + public static class Result<T> + extends Vertex<T, Tuple2<LongValue, LongValue>> { + private static final int HASH_SEED = 0xc23937c1; + + private Murmur3_32 hasher = new Murmur3_32(HASH_SEED); + + /** + * The no-arg constructor instantiates contained objects. + */ + public Result() { + f1 = new Tuple2<>(); + } + + /** + * Get the vertex degree. + * + * @return vertex degree + */ + public LongValue getDegree() { + return f1.f0; + } + + /** + * Get the number of triangles containing this vertex; equivalently, + * this is the number of edges between neighbors of this vertex. + * + * @return triangle count + */ + public LongValue getTriangleCount() { + return f1.f1; + } + + /** + * Get the local clustering coefficient score. This is computed as the + * number of edges between neighbors, equal to the triangle count, + * divided by the number of potential edges between neighbors. + * + * A score of {@code Double.NaN} is returned for a vertex with degree 1 + * for which both the triangle count and number of neighbors are zero. + * + * @return local clustering coefficient score + */ + public double getLocalClusteringCoefficientScore() { + long degree = getDegree().getValue(); + long neighborPairs = degree * (degree - 1) / 2; + + return (neighborPairs == 0) ? Double.NaN : getTriangleCount().getValue() / (double)neighborPairs; + } + + public String toVerboseString() { + return "Vertex ID: " + f0 + + ", vertex degree: " + getDegree() + + ", triangle count: " + getTriangleCount() + + ", local clustering coefficient: " + getLocalClusteringCoefficientScore(); + } + + @Override + public int hashCode() { + return hasher.reset() + .hash(f0.hashCode()) + .hash(f1.f0.getValue()) + .hash(f1.f1.getValue()) + .hash(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c71675f7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java new file mode 100644 index 0000000..e0ad30f --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.clustering.undirected; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.base.JoinOperatorBase; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeDegreePair; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Generates a listing of distinct triangles from the input graph. + * <br/> + * A triangle is a 3-cycle with vertices A, B, and C connected by edges + * (A, B), (A, C), and (B, C). + * <br/> + * The input graph must be a simple, undirected graph containing no duplicate + * edges or self-loops. + * <br/> + * Algorithm from "Graph Twiddling in a MapReduce World", J. D. Cohen, + * http://lintool.github.io/UMD-courses/bigdata-2015-Spring/content/Cohen_2009.pdf + * + * @param <K> graph ID type + * @param <VV> vertex value type + * @param <EV> edge value type + */ +public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV> +implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> { + + // Optional configuration + private boolean sortTriangleVertices = false; + + private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + + /** + * Normalize the triangle listing such that for each result (K0, K1, K2) + * the vertex IDs are sorted K0 < K1 < K2. + * + * @param sortTriangleVertices whether to output each triangle's vertices in sorted order + * @return this + */ + public TriangleListing<K, VV, EV> setSortTriangleVertices(boolean sortTriangleVertices) { + this.sortTriangleVertices = sortTriangleVertices; + + return this; + } + + /** + * Override the parallelism of operators processing small amounts of data. + * + * @param littleParallelism operator parallelism + * @return this + */ + public TriangleListing<K, VV, EV> setLittleParallelism(int littleParallelism) { + this.littleParallelism = littleParallelism; + + return this; + } + + /* + * Implementation notes: + * + * The requirement that "K extends CopyableValue<K>" can be removed when + * Flink has a self-join and GenerateTriplets is implemented as such. + * + * ProjectTriangles should eventually be replaced by ".projectFirst("*")" + * when projections use code generation. + */ + + @Override + public DataSet<Tuple3<K, K, K>> run(Graph<K, VV, EV> input) + throws Exception { + // u, v where u < v + DataSet<Tuple2<K, K>> filteredByID = input + .getEdges() + .flatMap(new FilterByID<K, EV>()) + .setParallelism(littleParallelism) + .name("Filter by ID"); + + // u, v, (edge value, deg(u), deg(v)) + DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> pairDegree = input + .run(new EdgeDegreePair<K, VV, EV>() + .setParallelism(littleParallelism)); + + // u, v where deg(u) < deg(v) or (deg(u) == deg(v) and u < v) + DataSet<Tuple2<K, K>> filteredByDegree = pairDegree + .flatMap(new FilterByDegree<K, EV>()) + .setParallelism(littleParallelism) + .name("Filter by degree"); + + // u, v, w where (u, v) and (u, w) are edges in graph, v < w + DataSet<Tuple3<K, K, K>> triplets = filteredByDegree + .groupBy(0) + .sortGroup(1, Order.ASCENDING) + .reduceGroup(new GenerateTriplets<K>()) + .setParallelism(littleParallelism) + .name("Generate triplets"); + + // u, v, w where (u, v), (u, w), and (v, w) are edges in graph, v < w + DataSet<Tuple3<K, K, K>> triangles = triplets + .join(filteredByID, JoinOperatorBase.JoinHint.REPARTITION_HASH_SECOND) + .where(1, 2) + .equalTo(0, 1) + .with(new ProjectTriangles<K>()) + .setParallelism(littleParallelism) + .name("Triangle listing"); + + if (sortTriangleVertices) { + triangles = triangles + .map(new SortTriangleVertices<K>()) + .name("Sort triangle vertices"); + } + + return triangles; + } + + /** + * Removes edge values while filtering such that only edges where the + * source vertex ID compares less than the target vertex ID are emitted. + * <br/> + * Since the input graph is a simple graph this filter removes exactly half + * of the original edges. + * + * @param <T> ID type + * @param <ET> edge value type + */ + @ForwardedFields("0; 1") + private static final class FilterByID<T extends Comparable<T>, ET> + implements FlatMapFunction<Edge<T, ET>, Tuple2<T, T>> { + private Tuple2<T, T> edge = new Tuple2<>(); + + @Override + public void flatMap(Edge<T, ET> value, Collector<Tuple2<T, T>> out) + throws Exception { + if (value.f0.compareTo(value.f1) < 0) { + edge.f0 = value.f0; + edge.f1 = value.f1; + out.collect(edge); + } + } + } + + /** + * Removes edge values while filtering such that edges where the source + * vertex has lower degree are emitted. If the source and target vertex + * degrees are equal then the edge is emitted if the source vertex ID + * compares less than the target vertex ID. + * <br/> + * Since the input graph is a simple graph this filter removes exactly half + * of the original edges. + * + * @param <T> ID type + */ + @ForwardedFields("0; 1") + private static final class FilterByDegree<T extends Comparable<T>, ET> + implements FlatMapFunction<Edge<T, Tuple3<ET, LongValue, LongValue>>, Tuple2<T, T>> { + private Tuple2<T, T> edge = new Tuple2<>(); + + @Override + public void flatMap(Edge<T, Tuple3<ET, LongValue, LongValue>> value, Collector<Tuple2<T, T>> out) + throws Exception { + Tuple3<ET, LongValue, LongValue> degrees = value.f2; + long sourceDegree = degrees.f1.getValue(); + long targetDegree = degrees.f2.getValue(); + + if (sourceDegree < targetDegree || + (sourceDegree == targetDegree && value.f0.compareTo(value.f1) < 0)) { + edge.f0 = value.f0; + edge.f1 = value.f1; + out.collect(edge); + } + } + } + + /** + * Generates the set of triplets by the pairwise enumeration of the open + * neighborhood for each vertex. The number of triplets is quadratic in + * the vertex degree; however, data skew is minimized by only generating + * triplets from the vertex with least degree. + * + * @param <T> ID type + */ + private static final class GenerateTriplets<T extends CopyableValue<T>> + implements GroupReduceFunction<Tuple2<T, T>, Tuple3<T, T, T>> { + private Tuple3<T, T, T> output = new Tuple3<>(); + + private List<T> visited = new ArrayList<>(); + + @Override + public void reduce(Iterable<Tuple2<T, T>> values, Collector<Tuple3<T, T, T>> out) + throws Exception { + int visitedCount = 0; + + Iterator<Tuple2<T, T>> iter = values.iterator(); + + while (true) { + Tuple2<T, T> edge = iter.next(); + + output.f0 = edge.f0; + output.f2 = edge.f1; + + for (int i = 0; i < visitedCount; i++) { + output.f1 = visited.get(i); + out.collect(output); + } + + if (! iter.hasNext()) { + break; + } + + if (visitedCount == visited.size()) { + visited.add(edge.f1.copy()); + } else { + edge.f1.copyTo(visited.get(visitedCount)); + } + + visitedCount += 1; + } + } + } + + /** + * Simply project the triplet as a triangle. + * + * @param <T> ID type + */ + @ForwardedFieldsFirst("0; 1; 2") + @ForwardedFieldsSecond("0; 1") + private static final class ProjectTriangles<T> + implements JoinFunction<Tuple3<T, T, T>, Tuple2<T, T>, Tuple3<T, T, T>> { + @Override + public Tuple3<T, T, T> join(Tuple3<T, T, T> first, Tuple2<T, T> second) + throws Exception { + return first; + } + } + + /** + * Reorders the vertices of each emitted triangle (K0, K1, K2) + * into sorted order such that K0 < K1 < K2. + * + * @param <T> ID type + */ + private static final class SortTriangleVertices<T extends Comparable<T>> + implements MapFunction<Tuple3<T, T, T>, Tuple3<T, T, T>> { + @Override + public Tuple3<T, T, T> map(Tuple3<T, T, T> value) + throws Exception { + T temp_val; + + // by the triangle listing algorithm we know f1 < f2 + if (value.f0.compareTo(value.f1) > 0) { + temp_val = value.f0; + value.f0 = value.f1; + + if (temp_val.compareTo(value.f2) <= 0) { + value.f1 = temp_val; + } else { + value.f1 = value.f2; + value.f2 = temp_val; + } + } + + return value; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c71675f7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java new file mode 100644 index 0000000..98f2b97 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.utils; + +import java.io.Serializable; + +/** + * A resettable implementation of the 32-bit MurmurHash algorithm. + */ +public class Murmur3_32 implements Serializable { + + private static final long serialVersionUID = 1L; + + // initial seed, which can be reset + private final int seed; + + // number of 32-bit values processed + private int count; + + // in-progress hash value + private int hash; + + /** + * A resettable implementation of the 32-bit MurmurHash algorithm. + * + * @param seed MurmurHash seed + */ + public Murmur3_32(int seed) { + this.seed = seed; + reset(); + } + + /** + * Re-initialize the MurmurHash state. + * + * @return this + */ + public Murmur3_32 reset() { + count = 0; + hash = seed; + return this; + } + + /** + * Process an {@code integer} value. + * + * @param input 32-bit input value + * @return this + */ + public Murmur3_32 hash(int input) { + count++; + + input *= 0xcc9e2d51; + input = input << 15; + input *= 0x1b873593; + + hash ^= input; + hash = hash << 13; + hash = hash * 5 + 0xe6546b64; + + return this; + } + + /** + * Process a {@code long} value. + * + * @param input 64-bit input value + * @return this + */ + public Murmur3_32 hash(long input) { + hash((int)(input >>> 32)); + hash((int)input); + return this; + } + + /** + * Finalize and return the MurmurHash output. + * + * @return 32-bit hash + */ + public int hash() { + hash ^= 4 * count; + hash ^= hash >>> 16; + hash *= 0x85ebca6b; + hash ^= hash >>> 13; + hash *= 0xc2b2ae35; + hash ^= hash >>> 16; + + return hash; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c71675f7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java new file mode 100644 index 0000000..414f200 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.clustering.undirected; + +import org.apache.commons.math3.util.CombinatoricsUtils; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.Utils.ChecksumHashCode; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class LocalClusteringCoefficientTest +extends AsmTestBase { + + @Test + public void testSimpleGraph() + throws Exception { + DataSet<Result<IntValue>> cc = undirectedSimpleGraph + .run(new LocalClusteringCoefficient<IntValue, NullValue, NullValue>()); + + String expectedResult = + "(0,(2,1))\n" + + "(1,(3,2))\n" + + "(2,(3,2))\n" + + "(3,(4,1))\n" + + "(4,(1,0))\n" + + "(5,(1,0))"; + + TestBaseUtils.compareResultAsText(cc.collect(), expectedResult); + } + + @Test + public void testCompleteGraph() + throws Exception { + long expectedDegree = completeGraphVertexCount - 1; + long expectedTriangleCount = CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2); + + DataSet<Result<LongValue>> cc = completeGraph + .run(new LocalClusteringCoefficient<LongValue, NullValue, NullValue>()); + + for (Result<LongValue> result : cc.collect()) { + assertEquals(expectedDegree, result.getDegree().getValue()); + assertEquals(expectedTriangleCount, result.getTriangleCount().getValue()); + } + } + + @Test + public void testRMatGraph() + throws Exception { + DataSet<Result<LongValue>> cc = undirectedRMatGraph + .run(new LocalClusteringCoefficient<LongValue, NullValue, NullValue>()); + + ChecksumHashCode checksum = DataSetUtils.checksumHashCode(cc); + + assertEquals(902, checksum.getCount()); + assertEquals(0x000001b08e783277L, checksum.getChecksum()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c71675f7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java new file mode 100644 index 0000000..0d1ebd0 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.clustering.undirected; + +import org.apache.commons.math3.util.CombinatoricsUtils; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.Utils.ChecksumHashCode; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TriangleListingTest +extends AsmTestBase { + + @Test + public void testSimpleGraph() + throws Exception { + DataSet<Tuple3<IntValue, IntValue, IntValue>> tl = undirectedSimpleGraph + .run(new TriangleListing<IntValue, NullValue, NullValue>() + .setSortTriangleVertices(true)); + + String expectedResult = + "(0,1,2)\n" + + "(1,2,3)"; + + TestBaseUtils.compareResultAsText(tl.collect(), expectedResult); + } + + @Test + public void testCompleteGraph() + throws Exception { + long expectedDegree = completeGraphVertexCount - 1; + long expectedCount = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2) / 3; + + DataSet<Tuple3<LongValue, LongValue, LongValue>> tl = completeGraph + .run(new TriangleListing<LongValue, NullValue, NullValue>()); + + ChecksumHashCode checksum = DataSetUtils.checksumHashCode(tl); + + assertEquals(expectedCount, checksum.getCount()); + } + + @Test + public void testRMatGraph() + throws Exception { + DataSet<Tuple3<LongValue, LongValue, LongValue>> tl = undirectedRMatGraph + .run(new TriangleListing<LongValue, NullValue, NullValue>() + .setSortTriangleVertices(true)); + + ChecksumHashCode checksum = DataSetUtils.checksumHashCode(tl); + + assertEquals(75049, checksum.getCount()); + assertEquals(0x00000001a5b500afL, checksum.getChecksum()); + } +}
