Repository: flink
Updated Branches:
  refs/heads/master a9fc71d85 -> c71675f7c


[FLINK-3768] [gelly] Local Clustering Coefficient

The local clustering coefficient measures the connectedness of each
vertex's neighborhood. Scores range from 0.0 (no edges between
neighbors) to 1.0 (neighborhood is a clique).

This closes #1896


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c71675f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c71675f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c71675f7

Branch: refs/heads/master
Commit: c71675f7cbe7e538d62bf1491aff69b369eda9eb
Parents: a9fc71d
Author: Greg Hogan <[email protected]>
Authored: Mon May 9 14:42:56 2016 -0400
Committer: Greg Hogan <[email protected]>
Committed: Mon May 16 14:03:28 2016 -0400

----------------------------------------------------------------------
 docs/apis/batch/libs/gelly.md                   |  18 ++
 .../examples/LocalClusteringCoefficient.java    | 135 +++++++++
 .../flink/graph/examples/TriangleListing.java   | 125 ++++++++
 .../undirected/LocalClusteringCoefficient.java  | 252 ++++++++++++++++
 .../clustering/undirected/TriangleListing.java  | 302 +++++++++++++++++++
 .../apache/flink/graph/utils/Murmur3_32.java    | 107 +++++++
 .../LocalClusteringCoefficientTest.java         |  81 +++++
 .../undirected/TriangleListingTest.java         |  78 +++++
 8 files changed, 1098 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c71675f7/docs/apis/batch/libs/gelly.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index c68001b..dd0b4c1 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -1831,6 +1831,7 @@ Gelly has a growing collection of graph algorithms for 
easily analyzing large-sc
 * [GSA Triangle Count](#gsa-triangle-count)
 * [Triangle Enumerator](#triangle-enumerator)
 * [Summarization](#summarization)
+* [Local Clustering Coefficient](#local-clustering-coefficient)
 
 Gelly's library methods can be used by simply calling the `run()` method on 
the input graph:
 
@@ -2050,6 +2051,23 @@ The algorithm takes a directed, vertex (and possibly 
edge) attributed graph as i
 vertex represents a group of vertices and each edge represents a group of 
edges from the input graph. Furthermore, each
 vertex and edge in the output graph stores the common group value and the 
number of represented elements.
 
+### Local Clustering Coefficient
+
+#### Overview
+The local clustering coefficient measures the connectedness of each vertex's 
neighborhood. Scores range from 0.0 (no
+edges between neighbors) to 1.0 (neighborhood is a clique).
+
+#### Details
+An edge between a vertex's neighbors is a triangle. Counting edges between 
neighbors is equivalent to counting the
+number of triangles which include the vertex. The clustering coefficient score 
is the number of edges between neighbors
+divided by the number of potential edges between neighbors.
+
+See the [Triangle Enumeration](#triangle-enumeration) library method for a 
detailed explanation of triangle enumeration.
+
+#### Usage
+The algorithm takes a simple, undirected graph as input and outputs a 
`DataSet` of tuples containing the vertex ID,
+vertex degree, and number of triangles containing the vertex. The vertex ID 
must be `Comparable` and `Copyable`.
+
 {% top %}
 
 Graph Algorithms

http://git-wip-us.apache.org/repos/asf/flink/blob/c71675f7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
new file mode 100644
index 0000000..2465da8
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import java.text.NumberFormat;
+
+/**
+ * Driver for the library implementation of Local Clustering Coefficient.
+ *
+ * This example generates an undirected RMat graph with the given scale and
+ * edge factor then calculates the local clustering coefficient for each 
vertex.
+ *
+ * @see 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient
+ */
+public class LocalClusteringCoefficient {
+
+       public static final int DEFAULT_SCALE = 10;
+
+       public static final int DEFAULT_EDGE_FACTOR = 16;
+
+       public static final boolean DEFAULT_CLIP_AND_FLIP = true;
+
+       public static void main(String[] args) throws Exception {
+               // Set up the execution environment
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().enableObjectReuse();
+
+               ParameterTool parameters = ParameterTool.fromArgs(args);
+
+               // Generate RMat graph
+               int scale = parameters.getInt("scale", DEFAULT_SCALE);
+               int edgeFactor = parameters.getInt("edge_factor", 
DEFAULT_EDGE_FACTOR);
+
+               RandomGenerableFactory<JDKRandomGenerator> rnd = new 
JDKRandomGeneratorFactory();
+
+               long vertexCount = 1 << scale;
+               long edgeCount = vertexCount * edgeFactor;
+
+               boolean clipAndFlip = parameters.getBoolean("clip_and_flip", 
DEFAULT_CLIP_AND_FLIP);
+
+               Graph<LongValue, NullValue, NullValue> graph = new 
RMatGraph<>(env, rnd, vertexCount, edgeCount)
+                       .setSimpleGraph(true, clipAndFlip)
+                       .generate();
+
+               DataSet cc;
+
+               if (scale > 32) {
+                       cc = graph
+                               .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>());
+               } else {
+                       cc = graph
+                               .run(new TranslateGraphIds<LongValue, IntValue, 
NullValue, NullValue>(new LongValueToIntValue()))
+                               .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue,
 NullValue, NullValue>());
+               }
+
+               switch (parameters.get("output", "")) {
+               case "print":
+                       for (Object e: cc.collect()) {
+                               Result result = (Result)e;
+                               System.out.println(result.toVerboseString());
+                       }
+                       break;
+
+               case "hash":
+                       System.out.println(DataSetUtils.checksumHashCode(cc));
+                       break;
+
+               case "csv":
+                       String filename = parameters.get("filename");
+
+                       String row_delimiter = parameters.get("row_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER);
+                       String field_delimiter = 
parameters.get("field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
+
+                       cc.writeAsCsv(filename, row_delimiter, field_delimiter);
+
+                       env.execute();
+                       break;
+               default:
+                       System.out.println("The local clustering coefficient 
measures the connectedness of each vertex's");
+                       System.out.println("neighborhood. Scores range from 0.0 
(no edges between neighbors) to 1.0");
+                       System.out.println("(neighborhood is a clique)");
+                       System.out.println("");
+                       System.out.println("This algorithm returns tuples 
containing the vertex ID, the degree of");
+                       System.out.println("the vertex, the number of edges 
between vertex neighbors, and the local");
+                       System.out.println("clustering coefficient.");
+                       System.out.println("");
+                       System.out.println("usage:");
+                       System.out.println("  LocalClusteringCoefficient 
[--scale SCALE] [--edge_factor EDGE_FACTOR] --output print");
+                       System.out.println("  LocalClusteringCoefficient 
[--scale SCALE] [--edge_factor EDGE_FACTOR] --output hash");
+                       System.out.println("  LocalClusteringCoefficient 
[--scale SCALE] [--edge_factor EDGE_FACTOR] --output csv" +
+                               " --filename FILENAME [--row_delimiter 
ROW_DELIMITER] [--field_delimiter FIELD_DELIMITER]");
+
+                       return;
+               }
+
+               JobExecutionResult result = env.getLastJobExecutionResult();
+
+               NumberFormat nf = NumberFormat.getInstance();
+               System.out.println("Execution runtime: " + 
nf.format(result.getNetRuntime()) + " ms");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c71675f7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
new file mode 100644
index 0000000..f5f232d
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import java.text.NumberFormat;
+
+/**
+ * Driver for the library implementation of Triangle Listing.
+ *
+ * This example generates an undirected RMat graph with the given scale
+ * and edge factor then lists all triangles.
+ *
+ * @see org.apache.flink.graph.library.clustering.undirected.TriangleListing
+ */
+public class TriangleListing {
+
+       public static final int DEFAULT_SCALE = 10;
+
+       public static final int DEFAULT_EDGE_FACTOR = 16;
+
+       public static final boolean DEFAULT_CLIP_AND_FLIP = true;
+
+       public static void main(String[] args) throws Exception {
+               // Set up the execution environment
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().enableObjectReuse();
+
+               ParameterTool parameters = ParameterTool.fromArgs(args);
+
+               // Generate RMat graph
+               int scale = parameters.getInt("scale", DEFAULT_SCALE);
+               int edgeFactor = parameters.getInt("edge_factor", 
DEFAULT_EDGE_FACTOR);
+
+               RandomGenerableFactory<JDKRandomGenerator> rnd = new 
JDKRandomGeneratorFactory();
+
+               long vertexCount = 1 << scale;
+               long edgeCount = vertexCount * edgeFactor;
+
+               boolean clipAndFlip = parameters.getBoolean("clip_and_flip", 
DEFAULT_CLIP_AND_FLIP);
+
+               Graph<LongValue, NullValue, NullValue> graph = new 
RMatGraph<>(env, rnd, vertexCount, edgeCount)
+                       .setSimpleGraph(true, clipAndFlip)
+                       .generate();
+
+               DataSet tl;
+
+               if (scale > 32) {
+                       tl = graph
+                               .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, 
NullValue, NullValue>());
+               } else {
+                       tl = graph
+                               .run(new TranslateGraphIds<LongValue, IntValue, 
NullValue, NullValue>(new LongValueToIntValue()))
+                               .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, 
NullValue, NullValue>());
+               }
+
+               switch (parameters.get("output", "")) {
+               case "print":
+                       tl.print();
+                       break;
+
+               case "hash":
+                       System.out.println(DataSetUtils.checksumHashCode(tl));
+                       break;
+
+               case "csv":
+                       String filename = parameters.get("filename");
+
+                       String row_delimiter = parameters.get("row_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER);
+                       String field_delimiter = 
parameters.get("field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
+
+                       tl.writeAsCsv(filename, row_delimiter, field_delimiter);
+
+                       env.execute();
+                       break;
+               default:
+                       System.out.println("Lists all distinct triangles in the 
generated RMat graph.");
+                       System.out.println("");
+                       System.out.println("usage:");
+                       System.out.println("  TriangleListing [--scale SCALE] 
[--edge_factor EDGE_FACTOR] --output print");
+                       System.out.println("  TriangleListing [--scale SCALE] 
[--edge_factor EDGE_FACTOR] --output hash");
+                       System.out.println("  TriangleListing [--scale SCALE] 
[--edge_factor EDGE_FACTOR] --output csv" +
+                               " --filename FILENAME [--row_delimiter 
ROW_DELIMITER] [--field_delimiter FIELD_DELIMITER]");
+
+                       return;
+               }
+
+               JobExecutionResult result = env.getLastJobExecutionResult();
+
+               NumberFormat nf = NumberFormat.getInstance();
+               System.out.println("Execution runtime: " + 
nf.format(result.getNetRuntime()) + " ms");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c71675f7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
new file mode 100644
index 0000000..0a562d5
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
+import 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+
+/**
+ * The local clustering coefficient measures the connectedness of each vertex's
+ * neighborhood. Scores range from 0.0 (no edges between neighbors) to 1.0
+ * (neighborhood is a clique).
+ * <br/>
+ * An edge between a vertex's neighbors is a triangle. Counting edges between
+ * neighbors is equivalent to counting the number of triangles which include
+ * the vertex.
+ * <br/>
+ * The input graph must be a simple, undirected graph containing no duplicate
+ * edges or self-loops.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class LocalClusteringCoefficient<K extends Comparable<K> & 
CopyableValue<K>, VV, EV>
+implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
+
+       // Optional configuration
+       private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+       /**
+        * Override the parallelism of operators processing small amounts of 
data.
+        *
+        * @param littleParallelism operator parallelism
+        * @return this
+        */
+       public LocalClusteringCoefficient<K, VV, EV> setLittleParallelism(int 
littleParallelism) {
+               this.littleParallelism = littleParallelism;
+
+               return this;
+       }
+
+       /*
+        * Implementation notes:
+        *
+        * The requirement that "K extends CopyableValue<K>" can be removed when
+        *   removed from TriangleListing.
+        *
+        * CountVertices can be replaced by ".sum(1)" when Flink aggregators use
+        *   code generation.
+        */
+
+       @Override
+       public DataSet<Result<K>> run(Graph<K, VV, EV> input)
+                       throws Exception {
+               // u, v, w
+               DataSet<Tuple3<K,K,K>> triangles = input
+                       .run(new TriangleListing<K,VV,EV>()
+                               .setSortTriangleVertices(false)
+                               .setLittleParallelism(littleParallelism));
+
+               // u, 1
+               DataSet<Tuple2<K, LongValue>> triangleVertices = triangles
+                       .flatMap(new SplitTriangles<K>())
+                               .name("Split triangle vertices");
+
+               // u, triangle count
+               DataSet<Tuple2<K, LongValue>> vertexTriangleCount = 
triangleVertices
+                       .groupBy(0)
+                       .reduce(new CountVertices<K>())
+                               .name("Count triangles");
+
+               // u, deg(u)
+               DataSet<Vertex<K, LongValue>> vertexDegree = input
+                       .run(new VertexDegree<K, VV, EV>()
+                               .setParallelism(littleParallelism)
+                               .setIncludeZeroDegreeVertices(true));
+
+               // u, deg(u), triangle count
+               return vertexDegree
+                       .leftOuterJoin(vertexTriangleCount)
+                       .where(0)
+                       .equalTo(0)
+                       .with(new JoinVertexDegreeWithTriangleCount<K>())
+                               .setParallelism(littleParallelism)
+                               .name("Clustering coefficient");
+       }
+
+       /**
+        * Emits the three vertex IDs comprising each triangle along with an 
initial count.
+        *
+        * @param <T> ID type
+        */
+       private static class SplitTriangles<T>
+       implements FlatMapFunction<Tuple3<T, T, T>, Tuple2<T, LongValue>> {
+               private Tuple2<T, LongValue> output = new Tuple2<>(null, new 
LongValue(1));
+
+               @Override
+               public void flatMap(Tuple3<T, T, T> value, Collector<Tuple2<T, 
LongValue>> out)
+                               throws Exception {
+                       output.f0 = value.f0;
+                       out.collect(output);
+
+                       output.f0 = value.f1;
+                       out.collect(output);
+
+                       output.f0 = value.f2;
+                       out.collect(output);
+               }
+       }
+
+       /**
+        * Combines the count of each vertex ID.
+        *
+        * @param <T> ID type
+        */
+       @FunctionAnnotation.ForwardedFields("0")
+       private static class CountVertices<T>
+       implements ReduceFunction<Tuple2<T, LongValue>> {
+               @Override
+               public Tuple2<T, LongValue> reduce(Tuple2<T, LongValue> left, 
Tuple2<T, LongValue> right)
+                               throws Exception {
+                       left.f1.setValue(left.f1.getValue() + 
right.f1.getValue());
+                       return left;
+               }
+       }
+
+       /**
+        * Joins the vertex and degree with the vertex's triangle count.
+        *
+        * @param <T> ID type
+        */
+       @FunctionAnnotation.ForwardedFieldsFirst("0; 1->1.0")
+       @FunctionAnnotation.ForwardedFieldsSecond("0")
+       private static class JoinVertexDegreeWithTriangleCount<T>
+       implements JoinFunction<Vertex<T, LongValue>, Tuple2<T, LongValue>, 
Result<T>> {
+               private LongValue zero = new LongValue(0);
+
+               private Result<T> output = new Result<>();
+
+               @Override
+               public Result<T> join(Vertex<T, LongValue> vertexAndDegree, 
Tuple2<T, LongValue> vertexAndTriangleCount)
+                               throws Exception {
+                       output.f0 = vertexAndDegree.f0;
+                       output.f1.f0 = vertexAndDegree.f1;
+                       output.f1.f1 = (vertexAndTriangleCount == null) ? zero 
: vertexAndTriangleCount.f1;
+
+                       return output;
+               }
+       }
+
+       /**
+        * Wraps the vertex type to encapsulate results from the Clustering 
Coefficient algorithm.
+        *
+        * @param <T> ID type
+        */
+       public static class Result<T>
+       extends Vertex<T, Tuple2<LongValue, LongValue>> {
+               private static final int HASH_SEED = 0xc23937c1;
+
+               private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
+
+               /**
+                * The no-arg constructor instantiates contained objects.
+                */
+               public Result() {
+                       f1 = new Tuple2<>();
+               }
+
+               /**
+                * Get the vertex degree.
+                *
+                * @return vertex degree
+                */
+               public LongValue getDegree() {
+                       return f1.f0;
+               }
+
+               /**
+                * Get the number of triangles containing this vertex; 
equivalently,
+                * this is the number of edges between neighbors of this vertex.
+                *
+                * @return triangle count
+                */
+               public LongValue getTriangleCount() {
+                       return f1.f1;
+               }
+
+               /**
+                * Get the local clustering coefficient score. This is computed 
as the
+                * number of edges between neighbors, equal to the triangle 
count,
+                * divided by the number of potential edges between neighbors.
+                *
+                * A score of {@code Double.NaN} is returned for a vertex with 
degree 1
+                * for which both the triangle count and number of neighbors 
are zero.
+                *
+                * @return local clustering coefficient score
+                */
+               public double getLocalClusteringCoefficientScore() {
+                       long degree = getDegree().getValue();
+                       long neighborPairs = degree * (degree - 1) / 2;
+
+                       return (neighborPairs == 0) ? Double.NaN : 
getTriangleCount().getValue() / (double)neighborPairs;
+               }
+
+               public String toVerboseString() {
+                       return "Vertex ID: " + f0
+                               + ", vertex degree: " + getDegree()
+                               + ", triangle count: " + getTriangleCount()
+                               + ", local clustering coefficient: " + 
getLocalClusteringCoefficientScore();
+               }
+
+               @Override
+               public int hashCode() {
+                       return hasher.reset()
+                               .hash(f0.hashCode())
+                               .hash(f1.f0.getValue())
+                               .hash(f1.f1.getValue())
+                               .hash();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c71675f7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
new file mode 100644
index 0000000..e0ad30f
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeDegreePair;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Generates a listing of distinct triangles from the input graph.
+ * <br/>
+ * A triangle is a 3-cycle with vertices A, B, and C connected by edges
+ * (A, B), (A, C), and (B, C).
+ * <br/>
+ * The input graph must be a simple, undirected graph containing no duplicate
+ * edges or self-loops.
+ * <br/>
+ * Algorithm from "Graph Twiddling in a MapReduce World", J. D. Cohen,
+ * 
http://lintool.github.io/UMD-courses/bigdata-2015-Spring/content/Cohen_2009.pdf
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, 
EV>
+implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> {
+
+       // Optional configuration
+       private boolean sortTriangleVertices = false;
+
+       private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+       /**
+        * Normalize the triangle listing such that for each result (K0, K1, K2)
+        * the vertex IDs are sorted K0 < K1 < K2.
+        *
+        * @param sortTriangleVertices whether to output each triangle's 
vertices in sorted order
+        * @return this
+        */
+       public TriangleListing<K, VV, EV> setSortTriangleVertices(boolean 
sortTriangleVertices) {
+               this.sortTriangleVertices = sortTriangleVertices;
+
+               return this;
+       }
+
+       /**
+        * Override the parallelism of operators processing small amounts of 
data.
+        *
+        * @param littleParallelism operator parallelism
+        * @return this
+        */
+       public TriangleListing<K, VV, EV> setLittleParallelism(int 
littleParallelism) {
+               this.littleParallelism = littleParallelism;
+
+               return this;
+       }
+
+       /*
+        * Implementation notes:
+        *
+        * The requirement that "K extends CopyableValue<K>" can be removed when
+        *   Flink has a self-join and GenerateTriplets is implemented as such.
+        *
+        * ProjectTriangles should eventually be replaced by 
".projectFirst("*")"
+        *   when projections use code generation.
+        */
+
+       @Override
+       public DataSet<Tuple3<K, K, K>> run(Graph<K, VV, EV> input)
+                       throws Exception {
+               // u, v where u < v
+               DataSet<Tuple2<K, K>> filteredByID = input
+                       .getEdges()
+                       .flatMap(new FilterByID<K, EV>())
+                               .setParallelism(littleParallelism)
+                               .name("Filter by ID");
+
+               // u, v, (edge value, deg(u), deg(v))
+               DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> pairDegree = 
input
+                       .run(new EdgeDegreePair<K, VV, EV>()
+                               .setParallelism(littleParallelism));
+
+               // u, v where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
+               DataSet<Tuple2<K, K>> filteredByDegree = pairDegree
+                       .flatMap(new FilterByDegree<K, EV>())
+                               .setParallelism(littleParallelism)
+                               .name("Filter by degree");
+
+               // u, v, w where (u, v) and (u, w) are edges in graph, v < w
+               DataSet<Tuple3<K, K, K>> triplets = filteredByDegree
+                       .groupBy(0)
+                       .sortGroup(1, Order.ASCENDING)
+                       .reduceGroup(new GenerateTriplets<K>())
+                               .setParallelism(littleParallelism)
+                               .name("Generate triplets");
+
+               // u, v, w where (u, v), (u, w), and (v, w) are edges in graph, 
v < w
+               DataSet<Tuple3<K, K, K>> triangles = triplets
+                       .join(filteredByID, 
JoinOperatorBase.JoinHint.REPARTITION_HASH_SECOND)
+                       .where(1, 2)
+                       .equalTo(0, 1)
+                       .with(new ProjectTriangles<K>())
+                               .setParallelism(littleParallelism)
+                               .name("Triangle listing");
+
+               if (sortTriangleVertices) {
+                       triangles = triangles
+                               .map(new SortTriangleVertices<K>())
+                                       .name("Sort triangle vertices");
+               }
+
+               return triangles;
+       }
+
+       /**
+        * Removes edge values while filtering such that only edges where the
+        * source vertex ID compares less than the target vertex ID are emitted.
+        * <br/>
+        * Since the input graph is a simple graph this filter removes exactly 
half
+        * of the original edges.
+        *
+        * @param <T> ID type
+        * @param <ET> edge value type
+        */
+       @ForwardedFields("0; 1")
+       private static final class FilterByID<T extends Comparable<T>, ET>
+       implements FlatMapFunction<Edge<T, ET>, Tuple2<T, T>> {
+               private Tuple2<T, T> edge = new Tuple2<>();
+
+               @Override
+               public void flatMap(Edge<T, ET> value, Collector<Tuple2<T, T>> 
out)
+                               throws Exception {
+                       if (value.f0.compareTo(value.f1) < 0) {
+                               edge.f0 = value.f0;
+                               edge.f1 = value.f1;
+                               out.collect(edge);
+                       }
+               }
+       }
+
+       /**
+        * Removes edge values while filtering such that edges where the source
+        * vertex has lower degree are emitted. If the source and target vertex
+        * degrees are equal then the edge is emitted if the source vertex ID
+        * compares less than the target vertex ID.
+        * <br/>
+        * Since the input graph is a simple graph this filter removes exactly 
half
+        * of the original edges.
+        *
+        * @param <T> ID type
+        */
+       @ForwardedFields("0; 1")
+       private static final class FilterByDegree<T extends Comparable<T>, ET>
+       implements FlatMapFunction<Edge<T, Tuple3<ET, LongValue, LongValue>>, 
Tuple2<T, T>> {
+               private Tuple2<T, T> edge = new Tuple2<>();
+
+               @Override
+               public void flatMap(Edge<T, Tuple3<ET, LongValue, LongValue>> 
value, Collector<Tuple2<T, T>> out)
+                               throws Exception {
+                       Tuple3<ET, LongValue, LongValue> degrees = value.f2;
+                       long sourceDegree = degrees.f1.getValue();
+                       long targetDegree = degrees.f2.getValue();
+
+                       if (sourceDegree < targetDegree ||
+                                       (sourceDegree == targetDegree && 
value.f0.compareTo(value.f1) < 0)) {
+                               edge.f0 = value.f0;
+                               edge.f1 = value.f1;
+                               out.collect(edge);
+                       }
+               }
+       }
+
+       /**
+        * Generates the set of triplets by the pairwise enumeration of the open
+        * neighborhood for each vertex. The number of triplets is quadratic in
+        * the vertex degree; however, data skew is minimized by only generating
+        * triplets from the vertex with least degree.
+        *
+        * @param <T> ID type
+        */
+       private static final class GenerateTriplets<T extends CopyableValue<T>>
+       implements GroupReduceFunction<Tuple2<T, T>, Tuple3<T, T, T>> {
+               private Tuple3<T, T, T> output = new Tuple3<>();
+
+               private List<T> visited = new ArrayList<>();
+
+               @Override
+               public void reduce(Iterable<Tuple2<T, T>> values, 
Collector<Tuple3<T, T, T>> out)
+                               throws Exception {
+                       int visitedCount = 0;
+
+                       Iterator<Tuple2<T, T>> iter = values.iterator();
+
+                       while (true) {
+                               Tuple2<T, T> edge = iter.next();
+
+                               output.f0 = edge.f0;
+                               output.f2 = edge.f1;
+
+                               for (int i = 0; i < visitedCount; i++) {
+                                       output.f1 = visited.get(i);
+                                       out.collect(output);
+                               }
+
+                               if (! iter.hasNext()) {
+                                       break;
+                               }
+
+                               if (visitedCount == visited.size()) {
+                                       visited.add(edge.f1.copy());
+                               } else {
+                                       
edge.f1.copyTo(visited.get(visitedCount));
+                               }
+
+                               visitedCount += 1;
+                       }
+               }
+       }
+
+       /**
+        * Simply project the triplet as a triangle.
+        *
+        * @param <T> ID type
+        */
+       @ForwardedFieldsFirst("0; 1; 2")
+       @ForwardedFieldsSecond("0; 1")
+       private static final class ProjectTriangles<T>
+       implements JoinFunction<Tuple3<T, T, T>, Tuple2<T, T>, Tuple3<T, T, T>> 
{
+               @Override
+               public Tuple3<T, T, T> join(Tuple3<T, T, T> first, Tuple2<T, T> 
second)
+                               throws Exception {
+                       return first;
+               }
+       }
+
+       /**
+        * Reorders the vertices of each emitted triangle (K0, K1, K2)
+        * into sorted order such that K0 < K1 < K2.
+        *
+        * @param <T> ID type
+        */
+       private static final class SortTriangleVertices<T extends Comparable<T>>
+       implements MapFunction<Tuple3<T, T, T>, Tuple3<T, T, T>> {
+               @Override
+               public Tuple3<T, T, T> map(Tuple3<T, T, T> value)
+                               throws Exception {
+                       T temp_val;
+
+                       // by the triangle listing algorithm we know f1 < f2
+                       if (value.f0.compareTo(value.f1) > 0) {
+                               temp_val = value.f0;
+                               value.f0 = value.f1;
+
+                               if (temp_val.compareTo(value.f2) <= 0) {
+                                       value.f1 = temp_val;
+                               } else {
+                                       value.f1 = value.f2;
+                                       value.f2 = temp_val;
+                               }
+                       }
+
+                       return value;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c71675f7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java
new file mode 100644
index 0000000..98f2b97
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
+
+import java.io.Serializable;
+
+/**
+ * A resettable implementation of the 32-bit MurmurHash algorithm.
+ */
+public class Murmur3_32 implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       // initial seed, which can be reset
+       private final int seed;
+
+       // number of 32-bit values processed
+       private int count;
+
+       // in-progress hash value
+       private int hash;
+
+       /**
+        * A resettable implementation of the 32-bit MurmurHash algorithm.
+        *
+        * @param seed MurmurHash seed
+        */
+       public Murmur3_32(int seed) {
+               this.seed = seed;
+               reset();
+       }
+
+       /**
+        * Re-initialize the MurmurHash state.
+        *
+        * @return this
+        */
+       public Murmur3_32 reset() {
+               count = 0;
+               hash = seed;
+               return this;
+       }
+
+       /**
+        * Process an {@code integer} value.
+        *
+        * @param input 32-bit input value
+        * @return this
+        */
+       public Murmur3_32 hash(int input) {
+               count++;
+
+               input *= 0xcc9e2d51;
+               input = input << 15;
+               input *= 0x1b873593;
+
+               hash ^= input;
+               hash = hash << 13;
+               hash = hash * 5 + 0xe6546b64;
+
+               return this;
+       }
+
+       /**
+        * Process a {@code long} value.
+        *
+        * @param input 64-bit input value
+        * @return this
+        */
+       public Murmur3_32 hash(long input) {
+               hash((int)(input >>> 32));
+               hash((int)input);
+               return this;
+       }
+
+       /**
+        * Finalize and return the MurmurHash output.
+        *
+        * @return 32-bit hash
+        */
+       public int hash() {
+               hash ^= 4 * count;
+               hash ^= hash >>> 16;
+               hash *= 0x85ebca6b;
+               hash ^= hash >>> 13;
+               hash *= 0xc2b2ae35;
+               hash ^= hash >>> 16;
+
+               return hash;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c71675f7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
new file mode 100644
index 0000000..414f200
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.asm.AsmTestBase;
+import 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class LocalClusteringCoefficientTest
+extends AsmTestBase {
+
+       @Test
+       public void testSimpleGraph()
+                       throws Exception {
+               DataSet<Result<IntValue>> cc = undirectedSimpleGraph
+                       .run(new LocalClusteringCoefficient<IntValue, 
NullValue, NullValue>());
+
+               String expectedResult =
+                       "(0,(2,1))\n" +
+                       "(1,(3,2))\n" +
+                       "(2,(3,2))\n" +
+                       "(3,(4,1))\n" +
+                       "(4,(1,0))\n" +
+                       "(5,(1,0))";
+
+               TestBaseUtils.compareResultAsText(cc.collect(), expectedResult);
+       }
+
+       @Test
+       public void testCompleteGraph()
+                       throws Exception {
+               long expectedDegree = completeGraphVertexCount - 1;
+               long expectedTriangleCount = 
CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+
+               DataSet<Result<LongValue>> cc = completeGraph
+                       .run(new LocalClusteringCoefficient<LongValue, 
NullValue, NullValue>());
+
+               for (Result<LongValue> result : cc.collect()) {
+                       assertEquals(expectedDegree, 
result.getDegree().getValue());
+                       assertEquals(expectedTriangleCount, 
result.getTriangleCount().getValue());
+               }
+       }
+
+       @Test
+       public void testRMatGraph()
+                       throws Exception {
+               DataSet<Result<LongValue>> cc = undirectedRMatGraph
+                       .run(new LocalClusteringCoefficient<LongValue, 
NullValue, NullValue>());
+
+               ChecksumHashCode checksum = DataSetUtils.checksumHashCode(cc);
+
+               assertEquals(902, checksum.getCount());
+               assertEquals(0x000001b08e783277L, checksum.getChecksum());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c71675f7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
new file mode 100644
index 0000000..0d1ebd0
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TriangleListingTest
+extends AsmTestBase {
+
+       @Test
+       public void testSimpleGraph()
+                       throws Exception {
+               DataSet<Tuple3<IntValue, IntValue, IntValue>> tl = 
undirectedSimpleGraph
+                       .run(new TriangleListing<IntValue, NullValue, 
NullValue>()
+                               .setSortTriangleVertices(true));
+
+               String expectedResult =
+                       "(0,1,2)\n" +
+                       "(1,2,3)";
+
+               TestBaseUtils.compareResultAsText(tl.collect(), expectedResult);
+       }
+
+       @Test
+       public void testCompleteGraph()
+                       throws Exception {
+               long expectedDegree = completeGraphVertexCount - 1;
+               long expectedCount = completeGraphVertexCount * 
CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2) / 3;
+
+               DataSet<Tuple3<LongValue, LongValue, LongValue>> tl = 
completeGraph
+                       .run(new TriangleListing<LongValue, NullValue, 
NullValue>());
+
+               ChecksumHashCode checksum = DataSetUtils.checksumHashCode(tl);
+
+               assertEquals(expectedCount, checksum.getCount());
+       }
+
+       @Test
+       public void testRMatGraph()
+                       throws Exception {
+               DataSet<Tuple3<LongValue, LongValue, LongValue>> tl = 
undirectedRMatGraph
+                       .run(new TriangleListing<LongValue, NullValue, 
NullValue>()
+                               .setSortTriangleVertices(true));
+
+               ChecksumHashCode checksum = DataSetUtils.checksumHashCode(tl);
+
+               assertEquals(75049, checksum.getCount());
+               assertEquals(0x00000001a5b500afL, checksum.getChecksum());
+       }
+}

Reply via email to