Repository: flink
Updated Branches:
  refs/heads/master ad8e665f0 -> 58850f292


[FLINK-4264] [gelly] New GraphMetrics driver

Updates VertexMetrics analytic, adds directed and undirected
EdgeMetric analytics, and includes a new GraphMetrics driver.

This closes #2295


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/58850f29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/58850f29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/58850f29

Branch: refs/heads/master
Commit: 58850f29243559c67bdc07c737bc5772c3f098db
Parents: ad8e665
Author: Greg Hogan <[email protected]>
Authored: Wed Jul 20 15:44:54 2016 -0400
Committer: Greg Hogan <[email protected]>
Committed: Wed Aug 24 10:05:12 2016 -0400

----------------------------------------------------------------------
 .../apache/flink/graph/driver/GraphMetrics.java | 232 +++++++++
 .../graph/examples/ClusteringCoefficient.java   |  14 +-
 .../org/apache/flink/graph/examples/HITS.java   |   2 +-
 .../flink/graph/examples/JaccardIndex.java      |   2 +-
 .../flink/graph/examples/TriangleListing.java   |   2 +-
 .../degree/annotate/directed/VertexDegrees.java |   2 +-
 .../directed/LocalClusteringCoefficient.java    |  32 +-
 .../undirected/LocalClusteringCoefficient.java  |  31 +-
 .../library/metric/directed/EdgeMetrics.java    | 507 +++++++++++++++++++
 .../library/metric/directed/VertexMetrics.java  | 102 +++-
 .../library/metric/undirected/EdgeMetrics.java  | 445 ++++++++++++++++
 .../metric/undirected/VertexMetrics.java        |  63 ++-
 .../metric/directed/EdgeMetricsTest.java        |  90 ++++
 .../metric/directed/VertexMetricsTest.java      |  19 +-
 .../metric/undirected/EdgeMetricsTest.java      |  89 ++++
 .../metric/undirected/VertexMetricsTest.java    |  14 +-
 16 files changed, 1600 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
new file mode 100644
index 0000000..cc265bb
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.driver;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalytic;
+import org.apache.flink.graph.GraphCsvReader;
+import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
+
+import java.text.NumberFormat;
+
+/**
+ * Computes vertex and edge metrics on a directed or undirected graph.
+ *
+ * @see org.apache.flink.graph.library.metric.directed.EdgeMetrics
+ * @see org.apache.flink.graph.library.metric.directed.VertexMetrics
+ * @see org.apache.flink.graph.library.metric.undirected.EdgeMetrics
+ * @see org.apache.flink.graph.library.metric.undirected.VertexMetrics
+ */
+public class GraphMetrics {
+
+       public static final int DEFAULT_SCALE = 10;
+
+       public static final int DEFAULT_EDGE_FACTOR = 16;
+
+       public static final boolean DEFAULT_CLIP_AND_FLIP = true;
+
+       private static void printUsage() {
+               System.out.println(WordUtils.wrap("Computes vertex and edge 
metrics on a directed or undirected graph.", 80));
+               System.out.println();
+               System.out.println("usage: GraphMetrics --directed <true | 
false> --input <csv | rmat [options]>");
+               System.out.println();
+               System.out.println("options:");
+               System.out.println("  --input csv --type <integer | string> 
[--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter 
LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
+               System.out.println("  --input rmat [--scale SCALE] 
[--edge_factor EDGE_FACTOR]");
+       }
+
+       public static void main(String[] args) throws Exception {
+               // Set up the execution environment
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().enableObjectReuse();
+
+               ParameterTool parameters = ParameterTool.fromArgs(args);
+               if (! parameters.has("directed")) {
+                       printUsage();
+                       return;
+               }
+               boolean directedAlgorithm = parameters.getBoolean("directed");
+
+               GraphAnalytic vm;
+               GraphAnalytic em;
+
+               switch (parameters.get("input", "")) {
+                       case "csv": {
+                               String lineDelimiter = 
StringEscapeUtils.unescapeJava(
+                                       parameters.get("input_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+                               String fieldDelimiter = 
StringEscapeUtils.unescapeJava(
+                                       parameters.get("input_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+                               GraphCsvReader reader = Graph
+                                       
.fromCsvReader(parameters.get("input_filename"), env)
+                                               .ignoreCommentsEdges("#")
+                                               
.lineDelimiterEdges(lineDelimiter)
+                                               
.fieldDelimiterEdges(fieldDelimiter);
+
+                               switch (parameters.get("type", "")) {
+                                       case "integer": {
+                                               Graph<LongValue, NullValue, 
NullValue> graph = reader
+                                                       
.keyType(LongValue.class);
+
+                                               if (directedAlgorithm) {
+                                                       if 
(parameters.getBoolean("simplify", false)) {
+                                                               graph = graph
+                                                                       
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, 
NullValue, NullValue>());
+                                                       }
+
+                                                       vm = graph
+                                                               .run(new 
org.apache.flink.graph.library.metric.directed.VertexMetrics<LongValue, 
NullValue, NullValue>());
+                                                       em = graph
+                                                               .run(new 
org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, 
NullValue, NullValue>());
+                                               } else {
+                                                       if 
(parameters.getBoolean("simplify", false)) {
+                                                               graph = graph
+                                                                       
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, 
NullValue, NullValue>(false));
+                                                       }
+
+                                                       vm = graph
+                                                               .run(new 
org.apache.flink.graph.library.metric.undirected.VertexMetrics<LongValue, 
NullValue, NullValue>());
+                                                       em = graph
+                                                               .run(new 
org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, 
NullValue, NullValue>());
+                                               }
+                                       } break;
+
+                                       case "string": {
+                                               Graph<StringValue, NullValue, 
NullValue> graph = reader
+                                                       
.keyType(StringValue.class);
+
+                                               if (directedAlgorithm) {
+                                                       if 
(parameters.getBoolean("simplify", false)) {
+                                                               graph = graph
+                                                                       
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, 
NullValue, NullValue>());
+                                                       }
+
+                                                       vm = graph
+                                                               .run(new 
org.apache.flink.graph.library.metric.directed.VertexMetrics<StringValue, 
NullValue, NullValue>());
+                                                       em = graph
+                                                               .run(new 
org.apache.flink.graph.library.metric.directed.EdgeMetrics<StringValue, 
NullValue, NullValue>());
+                                               } else {
+                                                       if 
(parameters.getBoolean("simplify", false)) {
+                                                               graph = graph
+                                                                       
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, 
NullValue, NullValue>(false));
+                                                       }
+
+                                                       vm = graph
+                                                               .run(new 
org.apache.flink.graph.library.metric.undirected.VertexMetrics<StringValue, 
NullValue, NullValue>());
+                                                       em = graph
+                                                               .run(new 
org.apache.flink.graph.library.metric.undirected.EdgeMetrics<StringValue, 
NullValue, NullValue>());
+                                               }
+                                       } break;
+
+                                       default:
+                                               printUsage();
+                                               return;
+                               }
+                               } break;
+
+                       case "rmat": {
+                               int scale = parameters.getInt("scale", 
DEFAULT_SCALE);
+                               int edgeFactor = 
parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
+
+                               RandomGenerableFactory<JDKRandomGenerator> rnd 
= new JDKRandomGeneratorFactory();
+
+                               long vertexCount = 1L << scale;
+                               long edgeCount = vertexCount * edgeFactor;
+
+
+                               Graph<LongValue, NullValue, NullValue> graph = 
new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+                                       .generate();
+
+                               if (directedAlgorithm) {
+                                       if (scale > 32) {
+                                               Graph<LongValue, NullValue, 
NullValue> newGraph = graph
+                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, 
NullValue>());
+
+                                               vm = newGraph
+                                                       .run(new 
org.apache.flink.graph.library.metric.directed.VertexMetrics<LongValue, 
NullValue, NullValue>());
+                                               em = newGraph
+                                                       .run(new 
org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, 
NullValue, NullValue>());
+                                       } else {
+                                               Graph<IntValue, NullValue, 
NullValue> newGraph = graph
+                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToIntValue()))
+                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, 
NullValue>());
+
+                                               vm = newGraph
+                                                       .run(new 
org.apache.flink.graph.library.metric.directed.VertexMetrics<IntValue, 
NullValue, NullValue>());
+                                               em = newGraph
+                                                       .run(new 
org.apache.flink.graph.library.metric.directed.EdgeMetrics<IntValue, NullValue, 
NullValue>());
+                                       }
+                               } else {
+                                       boolean clipAndFlip = 
parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+
+                                       if (scale > 32) {
+                                               Graph<LongValue, NullValue, 
NullValue> newGraph = graph
+                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, 
NullValue>(clipAndFlip));
+
+                                               vm = newGraph
+                                                       .run(new 
org.apache.flink.graph.library.metric.undirected.VertexMetrics<LongValue, 
NullValue, NullValue>());
+                                               em = newGraph
+                                                       .run(new 
org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, 
NullValue, NullValue>());
+                                       } else {
+                                               Graph<IntValue, NullValue, 
NullValue> newGraph = graph
+                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToIntValue()))
+                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, 
NullValue>(clipAndFlip));
+
+                                               vm = newGraph
+                                                       .run(new 
org.apache.flink.graph.library.metric.undirected.VertexMetrics<IntValue, 
NullValue, NullValue>());
+                                               em = newGraph
+                                                       .run(new 
org.apache.flink.graph.library.metric.undirected.EdgeMetrics<IntValue, 
NullValue, NullValue>());
+                                       }
+                               }
+                               } break;
+
+                       default:
+                               printUsage();
+                               return;
+               }
+
+               env.execute("Graph Metrics");
+
+               System.out.print("Vertex metrics:\n  ");
+               System.out.println(vm.getResult().toString().replace(";", "\n 
"));
+               System.out.print("\nEdge metrics:\n  ");
+               System.out.println(em.getResult().toString().replace(";", "\n 
"));
+
+               JobExecutionResult result = env.getLastJobExecutionResult();
+
+               NumberFormat nf = NumberFormat.getInstance();
+               System.out.println("\nExecution runtime: " + 
nf.format(result.getNetRuntime()) + " ms");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
index 8641428..e099e2b 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
@@ -72,7 +72,7 @@ public class ClusteringCoefficient {
                System.out.println(WordUtils.wrap("This algorithm returns 
tuples containing the vertex ID, the degree of" +
                        " the vertex, and the number of edges between vertex 
neighbors.", 80));
                System.out.println();
-               System.out.println("usage: ClusteringCoefficient --directed 
<true | false> --input <csv | rmat [options]> --output <print | hash | csv 
[options]");
+               System.out.println("usage: ClusteringCoefficient --directed 
<true | false> --input <csv | rmat [options]> --output <print | hash | csv 
[options]>");
                System.out.println();
                System.out.println("options:");
                System.out.println("  --input csv --type <integer | string> 
--input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] 
[--input_field_delimiter FIELD_DELIMITER]");
@@ -174,7 +174,8 @@ public class ClusteringCoefficient {
                                                gcc = newGraph
                                                        .run(new 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue,
 NullValue, NullValue>());
                                                lcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>());
+                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
+                                                               
.setIncludeZeroDegreeVertices(false));
                                        } else {
                                                Graph<IntValue, NullValue, 
NullValue> newGraph = graph
                                                        .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToIntValue()))
@@ -183,7 +184,8 @@ public class ClusteringCoefficient {
                                                gcc = newGraph
                                                        .run(new 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue,
 NullValue, NullValue>());
                                                lcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue,
 NullValue, NullValue>());
+                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue,
 NullValue, NullValue>()
+                                                               
.setIncludeZeroDegreeVertices(false));
                                        }
                                } else {
                                        boolean clipAndFlip = 
parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
@@ -195,7 +197,8 @@ public class ClusteringCoefficient {
                                                gcc = newGraph
                                                        .run(new 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue,
 NullValue, NullValue>());
                                                lcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>());
+                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
+                                                               
.setIncludeZeroDegreeVertices(false));
                                        } else {
                                                Graph<IntValue, NullValue, 
NullValue> newGraph = graph
                                                        .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToIntValue()))
@@ -204,7 +207,8 @@ public class ClusteringCoefficient {
                                                gcc = newGraph
                                                        .run(new 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue,
 NullValue, NullValue>());
                                                lcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue,
 NullValue, NullValue>());
+                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue,
 NullValue, NullValue>()
+                                                               
.setIncludeZeroDegreeVertices(false));
                                        }
                                }
                        } break;

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
index c772a3a..59612d9 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
@@ -65,7 +65,7 @@ public class HITS {
                        " scores for every vertex in a directed graph. A good 
\"hub\" links to good \"authorities\"" +
                        " and good \"authorities\" are linked from good 
\"hubs\".", 80));
                System.out.println();
-               System.out.println("usage: HITS --input <csv | rmat [options]> 
--output <print | hash | csv [options]");
+               System.out.println("usage: HITS --input <csv | rmat [options]> 
--output <print | hash | csv [options]>");
                System.out.println();
                System.out.println("options:");
                System.out.println("  --input csv --type <integer | string> 
--input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] 
[--input_field_delimiter FIELD_DELIMITER]");

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
index 2158fa2..824aab7 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
@@ -69,7 +69,7 @@ public class JaccardIndex {
                System.out.println(WordUtils.wrap("This algorithm returns 
4-tuples containing two vertex IDs, the" +
                        " number of shared neighbors, and the number of 
distinct neighbors.", 80));
                System.out.println();
-               System.out.println("usage: JaccardIndex --input <csv | rmat 
[options]> --output <print | hash | csv [options]");
+               System.out.println("usage: JaccardIndex --input <csv | rmat 
[options]> --output <print | hash | csv [options]>");
                System.out.println();
                System.out.println("options:");
                System.out.println("  --input csv --type <integer | string> 
--input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] 
[--input_field_delimiter FIELD_DELIMITER]");

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
index cd06dde..f3ce708 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
@@ -66,7 +66,7 @@ public class TriangleListing {
                System.out.println(WordUtils.wrap("This algorithm returns 
tuples containing the vertex IDs for each triangle and" +
                        " for directed graphs a bitmask indicating the presence 
of the six potential connecting edges.", 80));
                System.out.println();
-               System.out.println("usage: TriangleListing --directed <true | 
false> --input <csv | rmat [options]> --output <print | hash | csv [options]");
+               System.out.println("usage: TriangleListing --directed <true | 
false> --input <csv | rmat [options]> --output <print | hash | csv [options]>");
                System.out.println();
                System.out.println("options:");
                System.out.println("  --input csv --type <integer | string> 
--input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] 
[--input_field_delimiter FIELD_DELIMITER]");

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
index 9fef221..84873bc 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
@@ -86,7 +86,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, 
Degrees>> {
 
        @Override
        protected String getAlgorithmName() {
-               return VertexOutDegree.class.getName();
+               return VertexDegrees.class.getName();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
index e0defcd..22c8b41 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees
 import 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
@@ -59,9 +60,26 @@ public class LocalClusteringCoefficient<K extends 
Comparable<K> & CopyableValue<
 extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
 
        // Optional configuration
+       private OptionalBoolean includeZeroDegreeVertices = new 
OptionalBoolean(true, true);
+
        private int littleParallelism = PARALLELISM_DEFAULT;
 
        /**
+        * By default the vertex set is checked for zero degree vertices. When 
this
+        * flag is disabled only clustering coefficient scores for vertices with
+        * a degree of a least one will be produced.
+        *
+        * @param includeZeroDegreeVertices whether to output scores for 
vertices
+        *                                  with a degree of zero
+        * @return this
+        */
+       public LocalClusteringCoefficient<K, VV, EV> 
setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) {
+               this.includeZeroDegreeVertices.set(includeZeroDegreeVertices);
+
+               return this;
+       }
+
+       /**
         * Override the parallelism of operators processing small amounts of 
data.
         *
         * @param littleParallelism operator parallelism
@@ -90,6 +108,16 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, 
Result<K>> {
 
                LocalClusteringCoefficient rhs = (LocalClusteringCoefficient) 
other;
 
+               // verify that configurations can be merged
+
+               if 
(includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
+                       return false;
+               }
+
+               // merge configurations
+
+               
includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
+
                littleParallelism = Math.min(littleParallelism, 
rhs.littleParallelism);
 
                return true;
@@ -128,8 +156,8 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, 
Result<K>> {
                // u, deg(u)
                DataSet<Vertex<K, Degrees>> vertexDegree = input
                        .run(new VertexDegrees<K, VV, EV>()
-                               .setParallelism(littleParallelism)
-                               .setIncludeZeroDegreeVertices(true));
+                               
.setIncludeZeroDegreeVertices(includeZeroDegreeVertices.get())
+                               .setParallelism(littleParallelism));
 
                // u, deg(u), triangle count
                return vertexDegree

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index cd859d9..4b4bf07 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
 import 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
@@ -59,9 +60,26 @@ public class LocalClusteringCoefficient<K extends 
Comparable<K> & CopyableValue<
 extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
 
        // Optional configuration
+       private OptionalBoolean includeZeroDegreeVertices = new 
OptionalBoolean(true, true);
+
        private int littleParallelism = PARALLELISM_DEFAULT;
 
        /**
+        * By default the vertex set is checked for zero degree vertices. When 
this
+        * flag is disabled only clustering coefficient scores for vertices with
+        * a degree of a least one will be produced.
+        *
+        * @param includeZeroDegreeVertices whether to output scores for 
vertices
+        *                                  with a degree of zero
+        * @return this
+        */
+       public LocalClusteringCoefficient<K, VV, EV> 
setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) {
+               this.includeZeroDegreeVertices.set(includeZeroDegreeVertices);
+
+               return this;
+       }
+
+       /**
         * Override the parallelism of operators processing small amounts of 
data.
         *
         * @param littleParallelism operator parallelism
@@ -91,6 +109,15 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, 
Result<K>> {
 
                LocalClusteringCoefficient rhs = (LocalClusteringCoefficient) 
other;
 
+               // verify that configurations can be merged
+
+               if 
(includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
+                       return false;
+               }
+
+               // merge configurations
+
+               
includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
                littleParallelism = Math.min(littleParallelism, 
rhs.littleParallelism);
 
                return true;
@@ -129,8 +156,8 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, 
Result<K>> {
                // u, deg(u)
                DataSet<Vertex<K, LongValue>> vertexDegree = input
                        .run(new VertexDegree<K, VV, EV>()
-                               .setParallelism(littleParallelism)
-                               .setIncludeZeroDegreeVertices(true));
+                               
.setIncludeZeroDegreeVertices(includeZeroDegreeVertices.get())
+                               .setParallelism(littleParallelism));
 
                // u, deg(u), triangle count
                return vertexDegree

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
new file mode 100644
index 0000000..167e31c
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.metric.directed;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.accumulators.LongMaximum;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import 
org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair;
+import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.library.metric.directed.EdgeMetrics.Result;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Compute the following edge metrics in a directed graph:
+ *  - number of vertices
+ *  - number of edges
+ *  - number of triangle triplets
+ *  - number of rectangle triplets
+ *  - number of triplets
+ *  - maximum degree
+ *  - maximum out degree
+ *  - maximum in degree
+ *  - maximum number of triangle triplets
+ *  - maximum number of rectangle triplets
+ *  - maximum number of triplets
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class EdgeMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Result> {
+
+       private String id = new AbstractID().toString();
+
+       private int parallelism = PARALLELISM_DEFAULT;
+
+       /**
+        * Override the operator parallelism.
+        *
+        * @param parallelism operator parallelism
+        * @return this
+        */
+       public EdgeMetrics<K, VV, EV> setParallelism(int parallelism) {
+               this.parallelism = parallelism;
+
+               return this;
+       }
+
+       /*
+        * Implementation notes:
+        *
+        * Use aggregator to replace SumEdgeStats when aggregators are 
rewritten to use
+        *   a hash-combineable hashable-reduce.
+        *
+        * Use distinct to replace ReduceEdgeStats when the combiner can be 
disabled
+        *   with a sorted-reduce forced.
+        */
+
+       @Override
+       public EdgeMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
+                       throws Exception {
+               super.run(input);
+
+               // s, t, (d(s), d(t))
+               DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> edgeDegreesPair 
= input
+                       .run(new EdgeDegreesPair<K, VV, EV>()
+                               .setParallelism(parallelism));
+
+               // s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == 
deg(v) and u < v)
+               DataSet<Tuple3<K, Degrees, LongValue>> edgeStats = 
edgeDegreesPair
+                       .flatMap(new EdgeStats<K, EV>())
+                               .setParallelism(parallelism)
+                               .name("Edge stats")
+                       .groupBy(0, 1)
+                       .reduceGroup(new ReduceEdgeStats<K>())
+                               .setParallelism(parallelism)
+                               .name("Reduce edge stats")
+                       .groupBy(0)
+                       .reduce(new SumEdgeStats<K>())
+                               .setCombineHint(CombineHint.HASH)
+                               .setParallelism(parallelism)
+                               .name("Sum edge stats");
+
+               edgeStats
+                       .output(new EdgeMetricsHelper<K, EV>(id))
+                               .setParallelism(parallelism)
+                               .name("Edge metrics");
+
+               return this;
+       }
+
+       @Override
+       public Result getResult() {
+               JobExecutionResult res = env.getLastJobExecutionResult();
+
+               long vertexCount = res.getAccumulatorResult(id + "-0");
+               long edgeCount = res.getAccumulatorResult(id + "-1");
+               long triangleTripletCount = res.getAccumulatorResult(id + "-2");
+               long rectangleTripletCount = res.getAccumulatorResult(id + 
"-3");
+               long tripletCount = res.getAccumulatorResult(id + "-4");
+               long maximumDegree = res.getAccumulatorResult(id + "-5");
+               long maximumOutDegree = res.getAccumulatorResult(id + "-6");
+               long maximumInDegree = res.getAccumulatorResult(id + "-7");
+               long maximumTriangleTriplets = res.getAccumulatorResult(id + 
"-8");
+               long maximumRectangleTriplets = res.getAccumulatorResult(id + 
"-9");
+               long maximumTriplets = res.getAccumulatorResult(id + "-a");
+
+               return new Result(vertexCount, edgeCount, triangleTripletCount, 
rectangleTripletCount, tripletCount,
+                       maximumDegree, maximumOutDegree, maximumInDegree,
+                       maximumTriangleTriplets, maximumRectangleTriplets, 
maximumTriplets);
+       }
+
+       /**
+        * Produces a pair of tuples. The first tuple contains the source 
vertex ID,
+        * the target vertex ID, the source degrees, and the low-order count. 
The
+        * second tuple is the same with the source and target roles reversed.
+        *
+        * The low-order count is one if the source vertex degree is less than 
the
+        * target vertex degree or if the degrees are equal and the source 
vertex
+        * ID compares lower than the target vertex ID; otherwise the low-order
+        * count is zero.
+        *
+        * @param <T> ID type
+        * @param <ET> edge value type
+        */
+       private static final class EdgeStats<T extends Comparable<T>, ET>
+       implements FlatMapFunction<Edge<T, Tuple3<ET, Degrees, Degrees>>, 
Tuple4<T, T, Degrees, LongValue>> {
+               private LongValue zero = new LongValue(0);
+
+               private LongValue one = new LongValue(1);
+
+               private Tuple4<T, T, Degrees, LongValue> output = new 
Tuple4<>();
+
+               @Override
+               public void flatMap(Edge<T, Tuple3<ET, Degrees, Degrees>> edge, 
Collector<Tuple4<T, T, Degrees, LongValue>> out)
+                               throws Exception {
+                       Tuple3<ET, Degrees, Degrees> degrees = edge.f2;
+                       long sourceDegree = degrees.f1.getDegree().getValue();
+                       long targetDegree = degrees.f2.getDegree().getValue();
+
+                       boolean ordered = (sourceDegree < targetDegree
+                               || (sourceDegree == targetDegree && 
edge.f0.compareTo(edge.f1) < 0));
+
+                       output.f0 = edge.f0;
+                       output.f1 = edge.f1;
+                       output.f2 = edge.f2.f1;
+                       output.f3 = ordered ? one : zero;
+                       out.collect(output);
+
+                       output.f0 = edge.f1;
+                       output.f1 = edge.f0;
+                       output.f2 = edge.f2.f2;
+                       output.f3 = ordered ? zero : one;
+                       out.collect(output);
+               }
+       }
+
+       /**
+        * Produces a distinct value for each edge.
+        *
+        * @param <T> ID type
+        */
+       @ForwardedFields("0")
+       private static final class ReduceEdgeStats<T>
+       implements GroupReduceFunction<Tuple4<T, T, Degrees, LongValue>, 
Tuple3<T, Degrees, LongValue>> {
+               Tuple3<T, Degrees, LongValue> output = new Tuple3<>();
+
+               @Override
+               public void reduce(Iterable<Tuple4<T, T, Degrees, LongValue>> 
values, Collector<Tuple3<T, Degrees, LongValue>> out)
+                               throws Exception {
+                       Tuple4<T, T, Degrees, LongValue> value = 
values.iterator().next();
+
+                       output.f0 = value.f0;
+                       output.f1 = value.f2;
+                       output.f2 = value.f3;
+
+                       out.collect(output);
+               }
+       }
+
+       /**
+        * Sums the low-order counts.
+        *
+        * @param <T> ID type
+        */
+       private static class SumEdgeStats<T>
+       implements ReduceFunction<Tuple3<T, Degrees, LongValue>> {
+               @Override
+               public Tuple3<T, Degrees, LongValue> reduce(Tuple3<T, Degrees, 
LongValue> value1, Tuple3<T, Degrees, LongValue> value2)
+                               throws Exception {
+                       value1.f2.setValue(value1.f2.getValue() + 
value2.f2.getValue());
+                       return value1;
+               }
+       }
+
+       /**
+        * Helper class to collect edge metrics.
+        *
+        * @param <T> ID type
+        */
+       private static class EdgeMetricsHelper<T extends Comparable<T>, ET>
+       extends RichOutputFormat<Tuple3<T, Degrees, LongValue>> {
+               private final String id;
+
+               private long vertexCount;
+               private long edgeCount;
+               private long triangleTripletCount;
+               private long rectangleTripletCount;
+               private long tripletCount;
+               private long maximumDegree;
+               private long maximumOutDegree;
+               private long maximumInDegree;
+               private long maximumTriangleTriplets;
+               private long maximumRectangleTriplets;
+               private long maximumTriplets;
+
+               /**
+                * This helper class collects edge metrics by scanning over and
+                * discarding elements from the given DataSet.
+                *
+                * The unique id is required because Flink's accumulator 
namespace is
+                * among all operators.
+                *
+                * @param id unique string used for accumulator names
+                */
+               public EdgeMetricsHelper(String id) {
+                       this.id = id;
+               }
+
+               @Override
+               public void configure(Configuration parameters) {}
+
+               @Override
+               public void open(int taskNumber, int numTasks) throws 
IOException {}
+
+               @Override
+               public void writeRecord(Tuple3<T, Degrees, LongValue> record) 
throws IOException {
+                       Degrees degrees = record.f1;
+                       long degree = degrees.getDegree().getValue();
+                       long outDegree = degrees.getOutDegree().getValue();
+                       long inDegree = degrees.getInDegree().getValue();
+
+                       long lowDegree = record.f2.getValue();
+                       long highDegree = degree - lowDegree;
+
+                       long triangleTriplets = lowDegree * (lowDegree - 1) / 2;
+                       long rectangleTriplets = triangleTriplets + lowDegree * 
highDegree;
+                       long triplets = degree * (degree - 1) / 2;
+
+                       vertexCount++;
+                       edgeCount += outDegree;
+                       triangleTripletCount += triangleTriplets;
+                       rectangleTripletCount += rectangleTriplets;
+                       tripletCount += triplets;
+                       maximumDegree = Math.max(maximumDegree, degree);
+                       maximumOutDegree = Math.max(maximumOutDegree, 
outDegree);
+                       maximumInDegree = Math.max(maximumInDegree, inDegree);
+                       maximumTriangleTriplets = 
Math.max(maximumTriangleTriplets, triangleTriplets);
+                       maximumRectangleTriplets = 
Math.max(maximumRectangleTriplets, rectangleTriplets);
+                       maximumTriplets = Math.max(maximumTriplets, triplets);
+               }
+
+               @Override
+               public void close() throws IOException {
+                       getRuntimeContext().addAccumulator(id + "-0", new 
LongCounter(vertexCount));
+                       getRuntimeContext().addAccumulator(id + "-1", new 
LongCounter(edgeCount));
+                       getRuntimeContext().addAccumulator(id + "-2", new 
LongCounter(triangleTripletCount));
+                       getRuntimeContext().addAccumulator(id + "-3", new 
LongCounter(rectangleTripletCount));
+                       getRuntimeContext().addAccumulator(id + "-4", new 
LongCounter(tripletCount));
+                       getRuntimeContext().addAccumulator(id + "-5", new 
LongMaximum(maximumDegree));
+                       getRuntimeContext().addAccumulator(id + "-6", new 
LongMaximum(maximumOutDegree));
+                       getRuntimeContext().addAccumulator(id + "-7", new 
LongMaximum(maximumInDegree));
+                       getRuntimeContext().addAccumulator(id + "-8", new 
LongMaximum(maximumTriangleTriplets));
+                       getRuntimeContext().addAccumulator(id + "-9", new 
LongMaximum(maximumRectangleTriplets));
+                       getRuntimeContext().addAccumulator(id + "-a", new 
LongMaximum(maximumTriplets));
+               }
+       }
+
+       /**
+        * Wraps edge metrics.
+        */
+       public static class Result {
+               private long vertexCount;
+               private long edgeCount;
+               private long triangleTripletCount;
+               private long rectangleTripletCount;
+               private long tripletCount;
+               private long maximumDegree;
+               private long maximumOutDegree;
+               private long maximumInDegree;
+               private long maximumTriangleTriplets;
+               private long maximumRectangleTriplets;
+               private long maximumTriplets;
+
+               public Result(long vertexCount, long edgeCount, long 
triangleTripletCount, long rectangleTripletCount, long tripletCount,
+                               long maximumDegree, long maximumOutDegree, long 
maximumInDegree,
+                               long maximumTriangleTriplets, long 
maximumRectangleTriplets, long maximumTriplets) {
+                       this.vertexCount = vertexCount;
+                       this.edgeCount = edgeCount;
+                       this.triangleTripletCount = triangleTripletCount;
+                       this.rectangleTripletCount = rectangleTripletCount;
+                       this.tripletCount = tripletCount;
+                       this.maximumDegree = maximumDegree;
+                       this.maximumOutDegree = maximumOutDegree;
+                       this.maximumInDegree = maximumInDegree;
+                       this.maximumTriangleTriplets = maximumTriangleTriplets;
+                       this.maximumRectangleTriplets = 
maximumRectangleTriplets;
+                       this.maximumTriplets = maximumTriplets;
+               }
+
+               /**
+                * Get the number of vertices.
+                *
+                * @return number of vertices
+                */
+               public long getNumberOfVertices() {
+                       return vertexCount;
+               }
+
+               /**
+                * Get the number of edges.
+                *
+                * @return number of edges
+                */
+               public long getNumberOfEdges() {
+                       return edgeCount;
+               }
+
+               /**
+                * Get the number of triangle triplets.
+                *
+                * @return number of triangle triplets
+                */
+               public long getNumberOfTriangleTriplets() {
+                       return triangleTripletCount;
+               }
+
+               /**
+                * Get the number of rectangle triplets.
+                *
+                * @return number of rectangle triplets
+                */
+               public long getNumberOfRectangleTriplets() {
+                       return rectangleTripletCount;
+               }
+
+               /**
+                * Get the number of triplets.
+                *
+                * @return number of triplets
+                */
+               public long getNumberOfTriplets() {
+                       return tripletCount;
+               }
+
+               /**
+                * Get the maximum degree.
+                *
+                * @return maximum degree
+                */
+               public long getMaximumDegree() {
+                       return maximumDegree;
+               }
+
+               /**
+                * Get the maximum out degree.
+                *
+                * @return maximum out degree
+                */
+               public long getMaximumOutDegree() {
+                       return maximumOutDegree;
+               }
+
+               /**
+                * Get the maximum in degree.
+                *
+                * @return maximum in degree
+                */
+               public long getMaximumInDegree() {
+                       return maximumInDegree;
+               }
+
+               /**
+                * Get the maximum triangle triplets.
+                *
+                * @return maximum triangle triplets
+                */
+               public long getMaximumTriangleTriplets() {
+                       return maximumTriangleTriplets;
+               }
+
+               /**
+                * Get the maximum rectangle triplets.
+                *
+                * @return maximum rectangle triplets
+                */
+               public long getMaximumRectangleTriplets() {
+                       return maximumRectangleTriplets;
+               }
+
+               /**
+                * Get the maximum triplets.
+                *
+                * @return maximum triplets
+                */
+               public long getMaximumTriplets() {
+                       return maximumTriplets;
+               }
+
+               @Override
+               public String toString() {
+                       NumberFormat nf = NumberFormat.getInstance();
+
+                       return "vertex count: " + nf.format(vertexCount)
+                               + "; edge count: " + nf.format(edgeCount)
+                               + "; triangle triplet count: " + 
nf.format(triangleTripletCount)
+                               + "; rectangle triplet count: " + 
nf.format(rectangleTripletCount)
+                               + "; triplet count: " + nf.format(tripletCount)
+                               + "; maximum degree: " + 
nf.format(maximumDegree)
+                               + "; maximum out degree: " + 
nf.format(maximumOutDegree)
+                               + "; maximum in degree: " + 
nf.format(maximumInDegree)
+                               + "; maximum triangle triplets: " + 
nf.format(maximumTriangleTriplets)
+                               + "; maximum rectangle triplets: " + 
nf.format(maximumRectangleTriplets)
+                               + "; maximum triplets: " + 
nf.format(maximumTriplets);
+               }
+
+               @Override
+               public int hashCode() {
+                       return new HashCodeBuilder()
+                               .append(vertexCount)
+                               .append(edgeCount)
+                               .append(triangleTripletCount)
+                               .append(rectangleTripletCount)
+                               .append(tripletCount)
+                               .append(maximumDegree)
+                               .append(maximumOutDegree)
+                               .append(maximumInDegree)
+                               .append(maximumTriangleTriplets)
+                               .append(maximumRectangleTriplets)
+                               .append(maximumTriplets)
+                               .hashCode();
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj == null) { return false; }
+                       if (obj == this) { return true; }
+                       if (obj.getClass() != getClass()) { return false; }
+
+                       Result rhs = (Result)obj;
+
+                       return new EqualsBuilder()
+                               .append(vertexCount, rhs.vertexCount)
+                               .append(edgeCount, rhs.edgeCount)
+                               .append(triangleTripletCount, 
rhs.triangleTripletCount)
+                               .append(rectangleTripletCount, 
rhs.rectangleTripletCount)
+                               .append(tripletCount, rhs.tripletCount)
+                               .append(maximumDegree, rhs.maximumDegree)
+                               .append(maximumOutDegree, rhs.maximumOutDegree)
+                               .append(maximumInDegree, rhs.maximumInDegree)
+                               .append(maximumTriangleTriplets, 
rhs.maximumTriangleTriplets)
+                               .append(maximumRectangleTriplets, 
rhs.maximumRectangleTriplets)
+                               .append(maximumTriplets, rhs.maximumTriplets)
+                               .isEquals();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
index 434bd28..22f7733 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.accumulators.LongMaximum;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.configuration.Configuration;
@@ -35,12 +36,19 @@ import org.apache.flink.types.CopyableValue;
 import org.apache.flink.util.AbstractID;
 
 import java.io.IOException;
+import java.text.NumberFormat;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
- * Compute the number of vertices, number of edges, and number of triplets in
- * a directed graph.
+ * Compute the following vertex metrics in a directed graph:
+ *  - number of vertices
+ *  - number of edges
+ *  - number of triplets
+ *  - maximum degree
+ *  - maximum out degree
+ *  - maximum in degree
+ *  - maximum number of triplets
  *
  * @param <K> graph ID type
  * @param <VV> vertex value type
@@ -107,8 +115,12 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                long vertexCount = res.getAccumulatorResult(id + "-0");
                long edgeCount = res.getAccumulatorResult(id + "-1");
                long tripletCount = res.getAccumulatorResult(id + "-2");
+               long maximumDegree = res.getAccumulatorResult(id + "-3");
+               long maximumOutDegree = res.getAccumulatorResult(id + "-4");
+               long maximumInDegree = res.getAccumulatorResult(id + "-5");
+               long maximumTriplets = res.getAccumulatorResult(id + "-6");
 
-               return new Result(vertexCount, edgeCount / 2, tripletCount);
+               return new Result(vertexCount, edgeCount, tripletCount, 
maximumDegree, maximumOutDegree, maximumInDegree, maximumTriplets);
        }
 
        /**
@@ -123,13 +135,17 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                private long vertexCount;
                private long edgeCount;
                private long tripletCount;
+               private long maximumDegree;
+               private long maximumOutDegree;
+               private long maximumInDegree;
+               private long maximumTriplets;
 
                /**
                 * This helper class collects vertex metrics by scanning over 
and
                 * discarding elements from the given DataSet.
                 *
                 * The unique id is required because Flink's accumulator 
namespace is
-                * among all operators.
+                * shared among all operators.
                 *
                 * @param id unique string used for accumulator names
                 */
@@ -147,10 +163,16 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                public void writeRecord(Vertex<T, Degrees> record) throws 
IOException {
                        long degree = record.f1.getDegree().getValue();
                        long outDegree = record.f1.getOutDegree().getValue();
+                       long inDegree = record.f1.getInDegree().getValue();
+                       long triplets = degree * (degree - 1) / 2;
 
                        vertexCount++;
                        edgeCount += outDegree;
-                       tripletCount += degree * (degree - 1) / 2;
+                       tripletCount += triplets;
+                       maximumDegree = Math.max(maximumDegree, degree);
+                       maximumOutDegree = Math.max(maximumOutDegree, 
outDegree);
+                       maximumInDegree = Math.max(maximumInDegree, inDegree);
+                       maximumTriplets = Math.max(maximumTriplets, triplets);
                }
 
                @Override
@@ -158,6 +180,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                        getRuntimeContext().addAccumulator(id + "-0", new 
LongCounter(vertexCount));
                        getRuntimeContext().addAccumulator(id + "-1", new 
LongCounter(edgeCount));
                        getRuntimeContext().addAccumulator(id + "-2", new 
LongCounter(tripletCount));
+                       getRuntimeContext().addAccumulator(id + "-3", new 
LongMaximum(maximumDegree));
+                       getRuntimeContext().addAccumulator(id + "-4", new 
LongMaximum(maximumOutDegree));
+                       getRuntimeContext().addAccumulator(id + "-5", new 
LongMaximum(maximumInDegree));
+                       getRuntimeContext().addAccumulator(id + "-6", new 
LongMaximum(maximumTriplets));
                }
        }
 
@@ -168,11 +194,19 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                private long vertexCount;
                private long edgeCount;
                private long tripletCount;
+               private long maximumDegree;
+               private long maximumOutDegree;
+               private long maximumInDegree;
+               private long maximumTriplets;
 
-               public Result(long vertexCount, long edgeCount, long 
tripletCount) {
+               public Result(long vertexCount, long edgeCount, long 
tripletCount, long maximumDegree, long maximumOutDegree, long maximumInDegree, 
long maximumTriplets) {
                        this.vertexCount = vertexCount;
                        this.edgeCount = edgeCount;
                        this.tripletCount = tripletCount;
+                       this.maximumDegree = maximumDegree;
+                       this.maximumOutDegree = maximumOutDegree;
+                       this.maximumInDegree = maximumInDegree;
+                       this.maximumTriplets = maximumTriplets;
                }
 
                /**
@@ -202,11 +236,53 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                        return tripletCount;
                }
 
+               /**
+                * Get the maximum degree.
+                *
+                * @return maximum degree
+                */
+               public long getMaximumDegree() {
+                       return maximumDegree;
+               }
+
+               /**
+                * Get the maximum out degree.
+                *
+                * @return maximum out degree
+                */
+               public long getMaximumOutDegree() {
+                       return maximumOutDegree;
+               }
+
+               /**
+                * Get the maximum in degree.
+                *
+                * @return maximum in degree
+                */
+               public long getMaximumInDegree() {
+                       return maximumInDegree;
+               }
+
+               /**
+                * Get the maximum triplets.
+                *
+                * @return maximum triplets
+                */
+               public long getMaximumTriplets() {
+                       return maximumTriplets;
+               }
+
                @Override
                public String toString() {
-                       return "vertex count: " + vertexCount
-                               + ", edge count:" + edgeCount
-                               + ", triplet count: " + tripletCount;
+                       NumberFormat nf = NumberFormat.getInstance();
+
+                       return "vertex count: " + nf.format(vertexCount)
+                               + "; edge count: " + nf.format(edgeCount)
+                               + "; triplet count: " + nf.format(tripletCount)
+                               + "; maximum degree: " + 
nf.format(maximumDegree)
+                               + "; maximum out degree: " + 
nf.format(maximumOutDegree)
+                               + "; maximum in degree: " + 
nf.format(maximumInDegree)
+                               + "; maximum triplets: " + 
nf.format(maximumTriplets);
                }
 
                @Override
@@ -215,6 +291,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                                .append(vertexCount)
                                .append(edgeCount)
                                .append(tripletCount)
+                               .append(maximumDegree)
+                               .append(maximumOutDegree)
+                               .append(maximumInDegree)
+                               .append(maximumTriplets)
                                .hashCode();
                }
 
@@ -230,6 +310,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                                .append(vertexCount, rhs.vertexCount)
                                .append(edgeCount, rhs.edgeCount)
                                .append(tripletCount, rhs.tripletCount)
+                               .append(maximumDegree, rhs.maximumDegree)
+                               .append(maximumOutDegree, rhs.maximumOutDegree)
+                               .append(maximumInDegree, rhs.maximumInDegree)
+                               .append(maximumTriplets, rhs.maximumTriplets)
                                .isEquals();
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
new file mode 100644
index 0000000..1d5b664
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.metric.undirected;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.accumulators.LongMaximum;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import 
org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeDegreePair;
+import org.apache.flink.graph.library.metric.undirected.EdgeMetrics.Result;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.AbstractID;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Compute the following edge metrics in an undirected graph:
+ *  - number of vertices
+ *  - number of edges
+ *  - number of triangle triplets
+ *  - number of rectangle triplets
+ *  - number of triplets
+ *  - maximum degree
+ *  - maximum number of triangle triplets
+ *  - maximum number of rectangle triplets
+ *  - maximum number of triplets
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class EdgeMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Result> {
+
+       private String id = new AbstractID().toString();
+
+       // Optional configuration
+       private boolean reduceOnTargetId = false;
+
+       private int parallelism = PARALLELISM_DEFAULT;
+
+       /**
+        * The degree can be counted from either the edge source or target IDs.
+        * By default the source IDs are counted. Reducing on target IDs may
+        * optimize the algorithm if the input edge list is sorted by target ID.
+        *
+        * @param reduceOnTargetId set to {@code true} if the input edge list
+        *                         is sorted by target ID
+        * @return this
+        */
+       public EdgeMetrics<K, VV, EV> setReduceOnTargetId(boolean 
reduceOnTargetId) {
+               this.reduceOnTargetId = reduceOnTargetId;
+
+               return this;
+       }
+
+       /**
+        * Override the operator parallelism.
+        *
+        * @param parallelism operator parallelism
+        * @return this
+        */
+       public EdgeMetrics<K, VV, EV> setParallelism(int parallelism) {
+               this.parallelism = parallelism;
+
+               return this;
+       }
+
+       /*
+        * Implementation notes:
+        *
+        * Use aggregator to replace SumEdgeStats when aggregators are 
rewritten to use
+        *   a hash-combineable hashed-reduce.
+        */
+
+       @Override
+       public EdgeMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
+                       throws Exception {
+               super.run(input);
+
+               // s, t, (d(s), d(t))
+               DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> 
edgeDegreePair = input
+                       .run(new EdgeDegreePair<K, VV, EV>()
+                               .setReduceOnTargetId(reduceOnTargetId)
+                               .setParallelism(parallelism));
+
+               // s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == 
deg(v) and u < v)
+               DataSet<Tuple3<K, LongValue, LongValue>> edgeStats = 
edgeDegreePair
+                       .map(new EdgeStats<K, EV>())
+                               .setParallelism(parallelism)
+                               .name("Edge stats")
+                       .groupBy(0)
+                       .reduce(new SumEdgeStats<K>())
+                               .setCombineHint(CombineHint.HASH)
+                               .setParallelism(parallelism)
+                               .name("Sum edge stats");
+
+               edgeStats
+                       .output(new EdgeMetricsHelper<K, EV>(id))
+                               .setParallelism(parallelism)
+                               .name("Edge metrics");
+
+               return this;
+       }
+
+       @Override
+       public Result getResult() {
+               JobExecutionResult res = env.getLastJobExecutionResult();
+
+               long vertexCount = res.getAccumulatorResult(id + "-0");
+               long edgeCount = res.getAccumulatorResult(id + "-1");
+               long triangleTripletCount = res.getAccumulatorResult(id + "-2");
+               long rectangleTripletCount = res.getAccumulatorResult(id + 
"-3");
+               long tripletCount = res.getAccumulatorResult(id + "-4");
+               long maximumDegree = res.getAccumulatorResult(id + "-5");
+               long maximumTriangleTriplets = res.getAccumulatorResult(id + 
"-6");
+               long maximumRectangleTriplets = res.getAccumulatorResult(id + 
"-7");
+               long maximumTriplets = res.getAccumulatorResult(id + "-8");
+
+               return new Result(vertexCount, edgeCount / 2, 
triangleTripletCount, rectangleTripletCount, tripletCount,
+                       maximumDegree, maximumTriangleTriplets, 
maximumRectangleTriplets, maximumTriplets);
+       }
+
+       /**
+        * Evaluates each edge and emits a tuple containing the source vertex 
ID,
+        * the source vertex degree, and a value of zero or one indicating the
+        * low-order count. The low-order count is one if the source vertex 
degree
+        * is less than the target vertex degree or if the degrees are equal and
+        * the source vertex ID compares lower than the target vertex ID; 
otherwise
+        * the low-order count is zero.
+        *
+        * @param <T> ID type
+        * @param <ET> edge value type
+        */
+       @FunctionAnnotation.ForwardedFields("0; 2.1->1")
+       private static class EdgeStats<T extends Comparable<T>, ET>
+       implements MapFunction<Edge<T, Tuple3<ET, LongValue, LongValue>>, 
Tuple3<T, LongValue, LongValue>> {
+               private LongValue zero = new LongValue(0);
+
+               private LongValue one = new LongValue(1);
+
+               private Tuple3<T, LongValue, LongValue> output = new Tuple3<>();
+
+               @Override
+               public Tuple3<T, LongValue, LongValue> map(Edge<T, Tuple3<ET, 
LongValue, LongValue>> edge)
+                               throws Exception {
+                       Tuple3<ET, LongValue, LongValue> degrees = edge.f2;
+
+                       output.f0 = edge.f0;
+                       output.f1 = degrees.f1;
+
+                       long sourceDegree = degrees.f1.getValue();
+                       long targetDegree = degrees.f2.getValue();
+
+                       if (sourceDegree < targetDegree ||
+                                       (sourceDegree == targetDegree && 
edge.f0.compareTo(edge.f1) < 0)) {
+                               output.f2 = one;
+                       } else {
+                               output.f2 = zero;
+                       }
+
+                       return output;
+               }
+       }
+
+       /**
+        * Sums the low-order counts.
+        *
+        * @param <T> ID type
+        */
+       private static class SumEdgeStats<T>
+       implements ReduceFunction<Tuple3<T, LongValue, LongValue>> {
+               @Override
+               public Tuple3<T, LongValue, LongValue> reduce(Tuple3<T, 
LongValue, LongValue> value1, Tuple3<T, LongValue, LongValue> value2)
+                               throws Exception {
+                       value1.f2.setValue(value1.f2.getValue() + 
value2.f2.getValue());
+                       return value1;
+               }
+       }
+
+       /**
+        * Helper class to collect edge metrics.
+        *
+        * @param <T> ID type
+        */
+       private static class EdgeMetricsHelper<T extends Comparable<T>, ET>
+       extends RichOutputFormat<Tuple3<T, LongValue, LongValue>> {
+               private final String id;
+
+               private long vertexCount;
+               private long edgeCount;
+               private long triangleTripletCount;
+               private long rectangleTripletCount;
+               private long tripletCount;
+               private long maximumDegree;
+               private long maximumTriangleTriplets;
+               private long maximumRectangleTriplets;
+               private long maximumTriplets;
+
+               /**
+                * This helper class collects edge metrics by scanning over and
+                * discarding elements from the given DataSet.
+                *
+                * The unique id is required because Flink's accumulator 
namespace is
+                * among all operators.
+                *
+                * @param id unique string used for accumulator names
+                */
+               public EdgeMetricsHelper(String id) {
+                       this.id = id;
+               }
+
+               @Override
+               public void configure(Configuration parameters) {}
+
+               @Override
+               public void open(int taskNumber, int numTasks) throws 
IOException {}
+
+               @Override
+               public void writeRecord(Tuple3<T, LongValue, LongValue> record) 
throws IOException {
+                       long degree = record.f1.getValue();
+                       long lowDegree = record.f2.getValue();
+                       long highDegree = degree - lowDegree;
+
+                       long triangleTriplets = lowDegree * (lowDegree - 1) / 2;
+                       long rectangleTriplets = triangleTriplets + lowDegree * 
highDegree;
+                       long triplets = degree * (degree - 1) / 2;
+
+                       vertexCount++;
+                       edgeCount += degree;
+                       triangleTripletCount += triangleTriplets;
+                       rectangleTripletCount += rectangleTriplets;
+                       tripletCount += triplets;
+                       maximumDegree = Math.max(maximumDegree, degree);
+                       maximumTriangleTriplets = 
Math.max(maximumTriangleTriplets, triangleTriplets);
+                       maximumRectangleTriplets = 
Math.max(maximumRectangleTriplets, rectangleTriplets);
+                       maximumTriplets = Math.max(maximumTriplets, triplets);
+               }
+
+               @Override
+               public void close() throws IOException {
+                       getRuntimeContext().addAccumulator(id + "-0", new 
LongCounter(vertexCount));
+                       getRuntimeContext().addAccumulator(id + "-1", new 
LongCounter(edgeCount));
+                       getRuntimeContext().addAccumulator(id + "-2", new 
LongCounter(triangleTripletCount));
+                       getRuntimeContext().addAccumulator(id + "-3", new 
LongCounter(rectangleTripletCount));
+                       getRuntimeContext().addAccumulator(id + "-4", new 
LongCounter(tripletCount));
+                       getRuntimeContext().addAccumulator(id + "-5", new 
LongMaximum(maximumDegree));
+                       getRuntimeContext().addAccumulator(id + "-6", new 
LongMaximum(maximumTriangleTriplets));
+                       getRuntimeContext().addAccumulator(id + "-7", new 
LongMaximum(maximumRectangleTriplets));
+                       getRuntimeContext().addAccumulator(id + "-8", new 
LongMaximum(maximumTriplets));
+               }
+       }
+
+       /**
+        * Wraps edge metrics.
+        */
+       public static class Result {
+               private long vertexCount;
+               private long edgeCount;
+               private long triangleTripletCount;
+               private long rectangleTripletCount;
+               private long tripletCount;
+               private long maximumDegree;
+               private long maximumTriangleTriplets;
+               private long maximumRectangleTriplets;
+               private long maximumTriplets;
+
+               public Result(long vertexCount, long edgeCount, long 
triangleTripletCount, long rectangleTripletCount, long tripletCount,
+                               long maximumDegree, long 
maximumTriangleTriplets, long maximumRectangleTriplets, long maximumTriplets) {
+                       this.vertexCount = vertexCount;
+                       this.edgeCount = edgeCount;
+                       this.triangleTripletCount = triangleTripletCount;
+                       this.rectangleTripletCount = rectangleTripletCount;
+                       this.tripletCount = tripletCount;
+                       this.maximumDegree = maximumDegree;
+                       this.maximumTriangleTriplets = maximumTriangleTriplets;
+                       this.maximumRectangleTriplets = 
maximumRectangleTriplets;
+                       this.maximumTriplets = maximumTriplets;
+               }
+
+               /**
+                * Get the number of vertices.
+                *
+                * @return number of vertices
+                */
+               public long getNumberOfVertices() {
+                       return vertexCount;
+               }
+
+               /**
+                * Get the number of edges.
+                *
+                * @return number of edges
+                */
+               public long getNumberOfEdges() {
+                       return edgeCount;
+               }
+
+               /**
+                * Get the number of triangle triplets.
+                *
+                * @return number of triangle triplets
+                */
+               public long getNumberOfTriangleTriplets() {
+                       return triangleTripletCount;
+               }
+
+               /**
+                * Get the number of rectangle triplets.
+                *
+                * @return number of rectangle triplets
+                */
+               public long getNumberOfRectangleTriplets() {
+                       return rectangleTripletCount;
+               }
+
+               /**
+                * Get the number of triplets.
+                *
+                * @return number of triplets
+                */
+               public long getNumberOfTriplets() {
+                       return tripletCount;
+               }
+
+               /**
+                * Get the maximum degree.
+                *
+                * @return maximum degree
+                */
+               public long getMaximumDegree() {
+                       return maximumDegree;
+               }
+
+               /**
+                * Get the maximum triangle triplets.
+                *
+                * @return maximum triangle triplets
+                */
+               public long getMaximumTriangleTriplets() {
+                       return maximumTriangleTriplets;
+               }
+
+               /**
+                * Get the maximum rectangle triplets.
+                *
+                * @return maximum rectangle triplets
+                */
+               public long getMaximumRectangleTriplets() {
+                       return maximumRectangleTriplets;
+               }
+
+               /**
+                * Get the maximum triplets.
+                *
+                * @return maximum triplets
+                */
+               public long getMaximumTriplets() {
+                       return maximumTriplets;
+               }
+
+               @Override
+               public String toString() {
+                       NumberFormat nf = NumberFormat.getInstance();
+
+                       return "vertex count: " + nf.format(vertexCount)
+                               + "; edge count: " + nf.format(edgeCount)
+                               + "; triangle triplet count: " + 
nf.format(triangleTripletCount)
+                               + "; rectangle triplet count: " + 
nf.format(rectangleTripletCount)
+                               + "; triplet count: " + nf.format(tripletCount)
+                               + "; maximum degree: " + 
nf.format(maximumDegree)
+                               + "; maximum triangle triplets: " + 
nf.format(maximumTriangleTriplets)
+                               + "; maximum rectangle triplets: " + 
nf.format(maximumRectangleTriplets)
+                               + "; maximum triplets: " + 
nf.format(maximumTriplets);
+               }
+
+               @Override
+               public int hashCode() {
+                       return new HashCodeBuilder()
+                               .append(vertexCount)
+                               .append(edgeCount)
+                               .append(triangleTripletCount)
+                               .append(rectangleTripletCount)
+                               .append(tripletCount)
+                               .append(maximumDegree)
+                               .append(maximumTriangleTriplets)
+                               .append(maximumRectangleTriplets)
+                               .append(maximumTriplets)
+                               .hashCode();
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj == null) { return false; }
+                       if (obj == this) { return true; }
+                       if (obj.getClass() != getClass()) { return false; }
+
+                       Result rhs = (Result)obj;
+
+                       return new EqualsBuilder()
+                               .append(vertexCount, rhs.vertexCount)
+                               .append(edgeCount, rhs.edgeCount)
+                               .append(triangleTripletCount, 
rhs.triangleTripletCount)
+                               .append(rectangleTripletCount, 
rhs.rectangleTripletCount)
+                               .append(tripletCount, rhs.tripletCount)
+                               .append(maximumDegree, rhs.maximumDegree)
+                               .append(maximumTriangleTriplets, 
rhs.maximumTriangleTriplets)
+                               .append(maximumRectangleTriplets, 
rhs.maximumRectangleTriplets)
+                               .append(maximumTriplets, rhs.maximumTriplets)
+                               .isEquals();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
index 3c26e43..d04fa7b 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.accumulators.LongMaximum;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.configuration.Configuration;
@@ -35,12 +36,17 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.util.AbstractID;
 
 import java.io.IOException;
+import java.text.NumberFormat;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
- * Compute the number of vertices, number of edges, and number of triplets in
- * an undirected graph.
+ * Compute the following vertex metrics in an undirected graph:
+ *  - number of vertices
+ *  - number of edges
+ *  - number of triplets
+ *  - maximum degree
+ *  - maximum number of triplets
  *
  * @param <K> graph ID type
  * @param <VV> vertex value type
@@ -125,8 +131,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                long vertexCount = res.getAccumulatorResult(id + "-0");
                long edgeCount = res.getAccumulatorResult(id + "-1");
                long tripletCount = res.getAccumulatorResult(id + "-2");
+               long maximumDegree = res.getAccumulatorResult(id + "-3");
+               long maximumTriplets = res.getAccumulatorResult(id + "-4");
 
-               return new Result(vertexCount, edgeCount / 2, tripletCount);
+               return new Result(vertexCount, edgeCount / 2, tripletCount, 
maximumDegree, maximumTriplets);
        }
 
        /**
@@ -141,13 +149,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                private long vertexCount;
                private long edgeCount;
                private long tripletCount;
+               private long maximumDegree;
+               private long maximumTriplets;
 
                /**
                 * This helper class collects vertex metrics by scanning over 
and
                 * discarding elements from the given DataSet.
                 *
                 * The unique id is required because Flink's accumulator 
namespace is
-                * among all operators.
+                * shared among all operators.
                 *
                 * @param id unique string used for accumulator names
                 */
@@ -164,10 +174,13 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                @Override
                public void writeRecord(Vertex<T, LongValue> record) throws 
IOException {
                        long degree = record.f1.getValue();
+                       long triplets = degree * (degree - 1) / 2;
 
                        vertexCount++;
                        edgeCount += degree;
-                       tripletCount += degree * (degree - 1) / 2;
+                       tripletCount += triplets;
+                       maximumDegree = Math.max(maximumDegree, degree);
+                       maximumTriplets = Math.max(maximumTriplets, triplets);
                }
 
                @Override
@@ -175,6 +188,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                        getRuntimeContext().addAccumulator(id + "-0", new 
LongCounter(vertexCount));
                        getRuntimeContext().addAccumulator(id + "-1", new 
LongCounter(edgeCount));
                        getRuntimeContext().addAccumulator(id + "-2", new 
LongCounter(tripletCount));
+                       getRuntimeContext().addAccumulator(id + "-3", new 
LongMaximum(maximumDegree));
+                       getRuntimeContext().addAccumulator(id + "-4", new 
LongMaximum(maximumTriplets));
                }
        }
 
@@ -185,11 +200,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                private long vertexCount;
                private long edgeCount;
                private long tripletCount;
+               private long maximumDegree;
+               private long maximumTriplets;
 
-               public Result(long vertexCount, long edgeCount, long 
tripletCount) {
+               public Result(long vertexCount, long edgeCount, long 
tripletCount, long maximumDegree, long maximumTriplets) {
                        this.vertexCount = vertexCount;
                        this.edgeCount = edgeCount;
                        this.tripletCount = tripletCount;
+                       this.maximumDegree = maximumDegree;
+                       this.maximumTriplets = maximumTriplets;
                }
 
                /**
@@ -219,11 +238,33 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                        return tripletCount;
                }
 
+               /**
+                * Get the maximum degree.
+                *
+                * @return maximum degree
+                */
+               public long getMaximumDegree() {
+                       return maximumDegree;
+               }
+
+               /**
+                * Get the maximum triplets.
+                *
+                * @return maximum triplets
+                */
+               public long getMaximumTriplets() {
+                       return maximumTriplets;
+               }
+
                @Override
                public String toString() {
-                       return "vertex count: " + vertexCount
-                                       + ", edge count:" + edgeCount
-                                       + ", triplet count: " + tripletCount;
+                       NumberFormat nf = NumberFormat.getInstance();
+
+                       return "vertex count: " + nf.format(vertexCount)
+                               + "; edge count: " + nf.format(edgeCount)
+                               + "; triplet count: " + nf.format(tripletCount)
+                               + "; maximum degree: " + 
nf.format(maximumDegree)
+                               + "; maximum triplets: " + 
nf.format(maximumTriplets);
                }
 
                @Override
@@ -232,6 +273,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                                .append(vertexCount)
                                .append(edgeCount)
                                .append(tripletCount)
+                               .append(maximumDegree)
+                               .append(maximumTriplets)
                                .hashCode();
                }
 
@@ -247,6 +290,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                                .append(vertexCount, rhs.vertexCount)
                                .append(edgeCount, rhs.edgeCount)
                                .append(tripletCount, rhs.tripletCount)
+                               .append(maximumDegree, rhs.maximumDegree)
+                               .append(maximumTriplets, rhs.maximumTriplets)
                                .isEquals();
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java
new file mode 100644
index 0000000..af5a154
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.metric.directed;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.library.metric.directed.EdgeMetrics.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class EdgeMetricsTest
+extends AsmTestBase {
+
+       @Test
+       public void testWithSimpleGraph()
+                       throws Exception {
+               Result expectedResult = new Result(6, 7, 2, 6, 13, 4, 2, 3, 1, 
3, 6);
+
+               Result edgeMetrics = new EdgeMetrics<IntValue, NullValue, 
NullValue>()
+                       .run(directedSimpleGraph)
+                       .execute();
+
+               assertEquals(expectedResult, edgeMetrics);
+       }
+
+       @Test
+       public void testWithCompleteGraph()
+                       throws Exception {
+               long expectedDegree = completeGraphVertexCount - 1;
+               long expectedEdges = completeGraphVertexCount * expectedDegree;
+               long expectedMaximumTriplets = 
CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+               long expectedTriplets = completeGraphVertexCount * 
expectedMaximumTriplets;
+
+               Result expectedResult = new Result(completeGraphVertexCount, 
expectedEdges, expectedTriplets / 3, 2 * expectedTriplets / 3, expectedTriplets,
+                       expectedDegree, expectedDegree, expectedDegree,
+                       expectedMaximumTriplets, expectedMaximumTriplets, 
expectedMaximumTriplets);
+
+               Result edgeMetrics = new EdgeMetrics<LongValue, NullValue, 
NullValue>()
+                       .run(completeGraph)
+                       .execute();
+
+               assertEquals(expectedResult, edgeMetrics);
+       }
+
+       @Test
+       public void testWithEmptyGraph()
+                       throws Exception {
+               Result expectedResult;
+
+               expectedResult = new Result(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+
+               Result withoutZeroDegreeVertices = new EdgeMetrics<LongValue, 
NullValue, NullValue>()
+                       .run(emptyGraph)
+                       .execute();
+
+               assertEquals(withoutZeroDegreeVertices, expectedResult);
+       }
+
+       @Test
+       public void testWithRMatGraph()
+                       throws Exception {
+               Result expectedResult = new Result(902, 12009, 107817, 315537, 
1003442, 463, 334, 342, 820, 3822, 106953);
+
+               Result withoutZeroDegreeVertices = new EdgeMetrics<LongValue, 
NullValue, NullValue>()
+                       .run(directedRMatGraph)
+                       .execute();
+
+               assertEquals(expectedResult, withoutZeroDegreeVertices);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
index 8145abd..e4362c0 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
@@ -34,10 +34,10 @@ extends AsmTestBase {
        @Test
        public void testWithSimpleGraph()
                        throws Exception {
-               Result expectedResult = new Result(6, 7, 13);
+               Result expectedResult = new Result(6, 7, 13, 4, 2, 3, 6);
 
                Result vertexMetrics = new VertexMetrics<IntValue, NullValue, 
NullValue>()
-                       .run(undirectedSimpleGraph)
+                       .run(directedSimpleGraph)
                        .execute();
 
                assertEquals(expectedResult, vertexMetrics);
@@ -47,10 +47,11 @@ extends AsmTestBase {
        public void testWithCompleteGraph()
                        throws Exception {
                long expectedDegree = completeGraphVertexCount - 1;
-               long expectedEdges = completeGraphVertexCount * expectedDegree 
/ 2;
-               long expectedTriplets = completeGraphVertexCount * 
CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+               long expectedEdges = completeGraphVertexCount * expectedDegree;
+               long expectedMaximumTriplets = 
CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+               long expectedTriplets = completeGraphVertexCount * 
expectedMaximumTriplets;
 
-               Result expectedResult = new Result(completeGraphVertexCount, 
expectedEdges, expectedTriplets);
+               Result expectedResult = new Result(completeGraphVertexCount, 
expectedEdges, expectedTriplets, expectedDegree, expectedDegree, 
expectedDegree, expectedMaximumTriplets);
 
                Result vertexMetrics = new VertexMetrics<LongValue, NullValue, 
NullValue>()
                        .run(completeGraph)
@@ -64,7 +65,7 @@ extends AsmTestBase {
                        throws Exception {
                Result expectedResult;
 
-               expectedResult = new Result(0, 0, 0);
+               expectedResult = new Result(0, 0, 0, 0, 0, 0, 0);
 
                Result withoutZeroDegreeVertices = new VertexMetrics<LongValue, 
NullValue, NullValue>()
                        .setIncludeZeroDegreeVertices(false)
@@ -73,7 +74,7 @@ extends AsmTestBase {
 
                assertEquals(withoutZeroDegreeVertices, expectedResult);
 
-               expectedResult = new Result(3, 0, 0);
+               expectedResult = new Result(3, 0, 0, 0, 0, 0, 0);
 
                Result withZeroDegreeVertices = new VertexMetrics<LongValue, 
NullValue, NullValue>()
                        .setIncludeZeroDegreeVertices(true)
@@ -86,10 +87,10 @@ extends AsmTestBase {
        @Test
        public void testWithRMatGraph()
                        throws Exception {
-               Result expectedResult = new Result(902, 10442, 1003442);
+               Result expectedResult = new Result(902, 12009, 1003442, 463, 
334, 342, 106953);
 
                Result withoutZeroDegreeVertices = new VertexMetrics<LongValue, 
NullValue, NullValue>()
-                       .run(undirectedRMatGraph)
+                       .run(directedRMatGraph)
                        .execute();
 
                assertEquals(expectedResult, withoutZeroDegreeVertices);

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java
new file mode 100644
index 0000000..b300d66
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.metric.undirected;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.library.metric.undirected.EdgeMetrics.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class EdgeMetricsTest
+extends AsmTestBase {
+
+       @Test
+       public void testWithSimpleGraph()
+                       throws Exception {
+               Result expectedResult = new Result(6, 7, 2, 6, 13, 4, 1, 3, 6);
+
+               Result edgeMetrics = new EdgeMetrics<IntValue, NullValue, 
NullValue>()
+                       .run(undirectedSimpleGraph)
+                       .execute();
+
+               assertEquals(expectedResult, edgeMetrics);
+       }
+
+       @Test
+       public void testWithCompleteGraph()
+                       throws Exception {
+               long expectedDegree = completeGraphVertexCount - 1;
+               long expectedEdges = completeGraphVertexCount * expectedDegree 
/ 2;
+               long expectedMaximumTriplets = 
CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+               long expectedTriplets = completeGraphVertexCount * 
expectedMaximumTriplets;
+
+               Result expectedResult = new Result(completeGraphVertexCount, 
expectedEdges, expectedTriplets / 3, 2 * expectedTriplets / 3, expectedTriplets,
+                       expectedDegree, expectedMaximumTriplets, 
expectedMaximumTriplets, expectedMaximumTriplets);
+
+               Result edgeMetrics = new EdgeMetrics<LongValue, NullValue, 
NullValue>()
+                       .run(completeGraph)
+                       .execute();
+
+               assertEquals(expectedResult, edgeMetrics);
+       }
+
+       @Test
+       public void testWithEmptyGraph()
+                       throws Exception {
+               Result expectedResult;
+
+               expectedResult = new Result(0, 0, 0, 0, 0, 0, 0, 0, 0);
+
+               Result withoutZeroDegreeVertices = new EdgeMetrics<LongValue, 
NullValue, NullValue>()
+                       .run(emptyGraph)
+                       .execute();
+
+               assertEquals(withoutZeroDegreeVertices, expectedResult);
+       }
+
+       @Test
+       public void testWithRMatGraph()
+                       throws Exception {
+               Result expectedResult = new Result(902, 10442, 107817, 315537, 
1003442, 463, 820, 3822, 106953);
+
+               Result withoutZeroDegreeVertices = new EdgeMetrics<LongValue, 
NullValue, NullValue>()
+                       .run(undirectedRMatGraph)
+                       .execute();
+
+               assertEquals(expectedResult, withoutZeroDegreeVertices);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
index a36ca94..8f7e1da 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
@@ -34,7 +34,7 @@ extends AsmTestBase {
        @Test
        public void testWithSimpleGraph()
                        throws Exception {
-               Result expectedResult = new Result(6, 7, 13);
+               Result expectedResult = new Result(6, 7, 13, 4, 6);
 
                Result vertexMetrics = new VertexMetrics<IntValue, NullValue, 
NullValue>()
                        .run(undirectedSimpleGraph)
@@ -48,9 +48,11 @@ extends AsmTestBase {
                        throws Exception {
                long expectedDegree = completeGraphVertexCount - 1;
                long expectedEdges = completeGraphVertexCount * expectedDegree 
/ 2;
-               long expectedTriplets = completeGraphVertexCount * 
CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+               long expectedMaximumTriplets = 
CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+               long expectedTriplets = completeGraphVertexCount * 
expectedMaximumTriplets;
 
-               Result expectedResult = new Result(completeGraphVertexCount, 
expectedEdges, expectedTriplets);
+               Result expectedResult = new Result(completeGraphVertexCount, 
expectedEdges, expectedTriplets,
+                       expectedDegree, expectedMaximumTriplets);
 
                Result vertexMetrics = new VertexMetrics<LongValue, NullValue, 
NullValue>()
                        .run(completeGraph)
@@ -64,7 +66,7 @@ extends AsmTestBase {
                        throws Exception {
                Result expectedResult;
 
-               expectedResult = new Result(0, 0, 0);
+               expectedResult = new Result(0, 0, 0, 0, 0);
 
                Result withoutZeroDegreeVertices = new VertexMetrics<LongValue, 
NullValue, NullValue>()
                        .setIncludeZeroDegreeVertices(false)
@@ -73,7 +75,7 @@ extends AsmTestBase {
 
                assertEquals(withoutZeroDegreeVertices, expectedResult);
 
-               expectedResult = new Result(3, 0, 0);
+               expectedResult = new Result(3, 0, 0, 0, 0);
 
                Result withZeroDegreeVertices = new VertexMetrics<LongValue, 
NullValue, NullValue>()
                        .setIncludeZeroDegreeVertices(true)
@@ -86,7 +88,7 @@ extends AsmTestBase {
        @Test
        public void testWithRMatGraph()
                        throws Exception {
-               Result expectedResult = new Result(902, 10442, 1003442);
+               Result expectedResult = new Result(902, 10442, 1003442, 463, 
106953);
 
                Result withoutZeroDegreeVertices = new VertexMetrics<LongValue, 
NullValue, NullValue>()
                        .run(undirectedRMatGraph)

Reply via email to