Repository: flink
Updated Branches:
  refs/heads/master 6a6eeb9d5 -> f025c455f


[FLINK-4963] [gelly] Tabulate edge direction for directed VertexMetrics

The current implementation simply counts edges. We can do one better and
tabulate unidirectional (u:v but no v:u) and bidirectional edges (u:v
and v:u).

This is effectively the 'dyadic census'.

This commit also makes edge metrics distinct from vertex metrics.
Previously EdgeMetrics has always been a superset of VertexMetrics.

This closes #2725


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f025c455
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f025c455
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f025c455

Branch: refs/heads/master
Commit: f025c455f2a2e4bc5bd5bb39b47c3dc1d1e3b117
Parents: 6a6eeb9
Author: Greg Hogan <[email protected]>
Authored: Fri Oct 28 12:28:41 2016 -0400
Committer: Greg Hogan <[email protected]>
Committed: Tue Nov 8 09:41:41 2016 -0500

----------------------------------------------------------------------
 .../org/apache/flink/graph/AnalyticHelper.java  |  79 +++++++
 .../directed/AverageClusteringCoefficient.java  |  46 ++--
 .../AverageClusteringCoefficient.java           |  46 ++--
 .../library/metric/directed/EdgeMetrics.java    | 219 +++----------------
 .../library/metric/directed/VertexMetrics.java  | 140 +++++++-----
 .../library/metric/undirected/EdgeMetrics.java  | 184 +++-------------
 .../metric/undirected/VertexMetrics.java        |  68 +++---
 .../metric/directed/EdgeMetricsTest.java        |  12 +-
 .../metric/directed/VertexMetricsTest.java      |  13 +-
 .../metric/undirected/EdgeMetricsTest.java      |  11 +-
 10 files changed, 300 insertions(+), 518 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f025c455/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java
new file mode 100644
index 0000000..b07a8c3
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.AbstractID;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * A {@link GraphAnalytic} computes over a DataSet and returns the results via
+ * Flink accumulators. This computation is cheaply performed in a terminating
+ * {@link RichOutputFormat}.
+ *
+ * This class simplifies the creation of analytic helpers by providing 
pass-through
+ * methods for adding and getting accumulators. Each accumulator name is 
prefixed
+ * with a random string since Flink accumulators share a per-job global 
namespace.
+ * This class also provides empty implementations of {@link 
RichOutputFormat#open}
+ * and {@link RichOutputFormat#close}.
+ *
+ * @param <T> element type
+ */
+public abstract class AnalyticHelper<T>
+extends RichOutputFormat<T> {
+
+       private static final String SEPARATOR = "-";
+
+       private String id = new AbstractID().toString();
+
+       @Override
+       public void configure(Configuration parameters) {}
+
+       @Override
+       public void open(int taskNumber, int numTasks) throws IOException {}
+
+       /**
+        * Adds an accumulator by prepending the given name with a random 
string.
+        *
+        * @param name The name of the accumulator
+        * @param accumulator The accumulator
+        * @param <V> Type of values that are added to the accumulator
+        * @param <A> Type of the accumulator result as it will be reported to 
the client
+        */
+       public <V, A extends Serializable> void addAccumulator(String name, 
Accumulator<V, A> accumulator) {
+               getRuntimeContext().addAccumulator(id + SEPARATOR + name, 
accumulator);
+       }
+
+       /**
+        * Gets the accumulator with the given name. Returns {@code null}, if 
no accumulator with
+        * that name was produced.
+        *
+        * @param accumulatorName The name of the accumulator
+        * @param <A> The generic type of the accumulator value
+        * @return The value of the accumulator with the given name
+        */
+       public <A> A getAccumulator(ExecutionEnvironment env, String 
accumulatorName) {
+               return env.getLastJobExecutionResult().getAccumulatorResult(id 
+ SEPARATOR + accumulatorName);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f025c455/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
index f589d04..c0a80d1 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
@@ -20,17 +20,14 @@ package org.apache.flink.graph.library.clustering.directed;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.DoubleCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.AbstractGraphAnalytic;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.AnalyticHelper;
 import 
org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient.Result;
 import org.apache.flink.types.CopyableValue;
-import org.apache.flink.util.AbstractID;
 
 import java.io.IOException;
 
@@ -47,7 +44,11 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 public class AverageClusteringCoefficient<K extends Comparable<K> & 
CopyableValue<K>, VV, EV>
 extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
-       private String id = new AbstractID().toString();
+       private static final String VERTEX_COUNT = "vertexCount";
+
+       private static final String SUM_OF_LOCAL_CLUSTERING_COEFFICIENT = 
"sumOfLocalClusteringCoefficient";
+
+       private AverageClusteringCoefficientHelper<K> 
averageClusteringCoefficientHelper;
 
        // Optional configuration
        private int littleParallelism = PARALLELISM_DEFAULT;
@@ -80,8 +81,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                        .run(new LocalClusteringCoefficient<K, VV, EV>()
                                .setLittleParallelism(littleParallelism));
 
+               averageClusteringCoefficientHelper = new 
AverageClusteringCoefficientHelper<>();
+
                localClusteringCoefficient
-                       .output(new AverageClusteringCoefficientHelper<K>(id))
+                       .output(averageClusteringCoefficientHelper)
                                .name("Average clustering coefficient");
 
                return this;
@@ -89,10 +92,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
        @Override
        public Result getResult() {
-               JobExecutionResult res = env.getLastJobExecutionResult();
-
-               long vertexCount = res.getAccumulatorResult(id + "-0");
-               double sumOfLocalClusteringCoefficient = 
res.getAccumulatorResult(id + "-1");
+               long vertexCount = 
averageClusteringCoefficientHelper.getAccumulator(env, VERTEX_COUNT);
+               double sumOfLocalClusteringCoefficient = 
averageClusteringCoefficientHelper.getAccumulator(env, 
SUM_OF_LOCAL_CLUSTERING_COEFFICIENT);
 
                return new Result(vertexCount, sumOfLocalClusteringCoefficient);
        }
@@ -103,28 +104,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
         * @param <T> ID type
         */
        private static class AverageClusteringCoefficientHelper<T>
-       extends RichOutputFormat<LocalClusteringCoefficient.Result<T>> {
-               private final String id;
-
+       extends AnalyticHelper<LocalClusteringCoefficient.Result<T>> {
                private long vertexCount;
                private double sumOfLocalClusteringCoefficient;
 
-               /**
-                * The unique id is required because Flink's accumulator 
namespace is
-                * shared among all operators.
-                *
-                * @param id unique string used for accumulator names
-                */
-               public AverageClusteringCoefficientHelper(String id) {
-                       this.id = id;
-               }
-
-               @Override
-               public void configure(Configuration parameters) {}
-
-               @Override
-               public void open(int taskNumber, int numTasks) throws 
IOException {}
-
                @Override
                public void writeRecord(LocalClusteringCoefficient.Result<T> 
record) throws IOException {
                        vertexCount++;
@@ -138,8 +121,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
                @Override
                public void close() throws IOException {
-                       getRuntimeContext().addAccumulator(id + "-0", new 
LongCounter(vertexCount));
-                       getRuntimeContext().addAccumulator(id + "-1", new 
DoubleCounter(sumOfLocalClusteringCoefficient));
+                       addAccumulator(VERTEX_COUNT, new 
LongCounter(vertexCount));
+                       addAccumulator(SUM_OF_LOCAL_CLUSTERING_COEFFICIENT, new 
DoubleCounter(sumOfLocalClusteringCoefficient));
                }
        }
 
@@ -148,7 +131,6 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
         */
        public static class Result {
                private long vertexCount;
-
                private double averageLocalClusteringCoefficient;
 
                /**

http://git-wip-us.apache.org/repos/asf/flink/blob/f025c455/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
index 03dbc71..3d4a88e 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
@@ -20,17 +20,14 @@ package 
org.apache.flink.graph.library.clustering.undirected;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.DoubleCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.AbstractGraphAnalytic;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.AnalyticHelper;
 import 
org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient.Result;
 import org.apache.flink.types.CopyableValue;
-import org.apache.flink.util.AbstractID;
 
 import java.io.IOException;
 
@@ -47,7 +44,11 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 public class AverageClusteringCoefficient<K extends Comparable<K> & 
CopyableValue<K>, VV, EV>
 extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
-       private String id = new AbstractID().toString();
+       private static final String VERTEX_COUNT = "vertexCount";
+
+       private static final String SUM_OF_LOCAL_CLUSTERING_COEFFICIENT = 
"sumOfLocalClusteringCoefficient";
+
+       private AverageClusteringCoefficientHelper<K> 
averageClusteringCoefficientHelper;
 
        // Optional configuration
        private int littleParallelism = PARALLELISM_DEFAULT;
@@ -80,8 +81,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                        .run(new LocalClusteringCoefficient<K, VV, EV>()
                                .setLittleParallelism(littleParallelism));
 
+               averageClusteringCoefficientHelper = new 
AverageClusteringCoefficientHelper<>();
+
                localClusteringCoefficient
-                       .output(new AverageClusteringCoefficientHelper<K>(id))
+                       .output(averageClusteringCoefficientHelper)
                                .name("Average clustering coefficient");
 
                return this;
@@ -89,10 +92,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
        @Override
        public Result getResult() {
-               JobExecutionResult res = env.getLastJobExecutionResult();
-
-               long vertexCount = res.getAccumulatorResult(id + "-0");
-               double sumOfLocalClusteringCoefficient = 
res.getAccumulatorResult(id + "-1");
+               long vertexCount = 
averageClusteringCoefficientHelper.getAccumulator(env, VERTEX_COUNT);
+               double sumOfLocalClusteringCoefficient = 
averageClusteringCoefficientHelper.getAccumulator(env, 
SUM_OF_LOCAL_CLUSTERING_COEFFICIENT);
 
                return new Result(vertexCount, sumOfLocalClusteringCoefficient);
        }
@@ -103,28 +104,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
         * @param <T> ID type
         */
        private static class AverageClusteringCoefficientHelper<T>
-       extends RichOutputFormat<LocalClusteringCoefficient.Result<T>> {
-               private final String id;
-
+       extends AnalyticHelper<LocalClusteringCoefficient.Result<T>> {
                private long vertexCount;
                private double sumOfLocalClusteringCoefficient;
 
-               /**
-                * The unique id is required because Flink's accumulator 
namespace is
-                * shared among all operators.
-                *
-                * @param id unique string used for accumulator names
-                */
-               public AverageClusteringCoefficientHelper(String id) {
-                       this.id = id;
-               }
-
-               @Override
-               public void configure(Configuration parameters) {}
-
-               @Override
-               public void open(int taskNumber, int numTasks) throws 
IOException {}
-
                @Override
                public void writeRecord(LocalClusteringCoefficient.Result<T> 
record) throws IOException {
                        vertexCount++;
@@ -138,8 +121,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
                @Override
                public void close() throws IOException {
-                       getRuntimeContext().addAccumulator(id + "-0", new 
LongCounter(vertexCount));
-                       getRuntimeContext().addAccumulator(id + "-1", new 
DoubleCounter(sumOfLocalClusteringCoefficient));
+                       addAccumulator(VERTEX_COUNT, new 
LongCounter(vertexCount));
+                       addAccumulator(SUM_OF_LOCAL_CLUSTERING_COEFFICIENT, new 
DoubleCounter(sumOfLocalClusteringCoefficient));
                }
        }
 
@@ -148,7 +131,6 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
         */
        public static class Result {
                private long vertexCount;
-
                private double averageLocalClusteringCoefficient;
 
                /**

http://git-wip-us.apache.org/repos/asf/flink/blob/f025c455/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
index 648fb76..07f4eed 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
@@ -20,13 +20,11 @@ package org.apache.flink.graph.library.metric.directed;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.accumulators.LongMaximum;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.io.RichOutputFormat;
 import 
org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
@@ -36,12 +34,12 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.AbstractGraphAnalytic;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.AnalyticHelper;
 import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair;
 import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 import org.apache.flink.graph.library.metric.directed.EdgeMetrics.Result;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
-import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.Collector;
 
 import java.io.IOException;
@@ -51,17 +49,10 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Compute the following edge metrics in a directed graph:
- *  - number of vertices
- *  - number of edges
  *  - number of triangle triplets
  *  - number of rectangle triplets
- *  - number of triplets
- *  - maximum degree
- *  - maximum out degree
- *  - maximum in degree
  *  - maximum number of triangle triplets
  *  - maximum number of rectangle triplets
- *  - maximum number of triplets
  *
  * @param <K> graph ID type
  * @param <VV> vertex value type
@@ -70,7 +61,15 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 public class EdgeMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV>
 extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
-       private String id = new AbstractID().toString();
+       private static final String TRIANGLE_TRIPLET_COUNT = 
"triangleTripletCount";
+
+       private static final String RECTANGLE_TRIPLET_COUNT = 
"rectangleTripletCount";
+
+       private static final String MAXIMUM_TRIANGLE_TRIPLETS = 
"maximumTriangleTriplets";
+
+       private static final String MAXIMUM_RECTANGLE_TRIPLETS = 
"maximumRectangleTriplets";
+
+       private EdgeMetricsHelper<K> edgeMetricsHelper;
 
        private int parallelism = PARALLELISM_DEFAULT;
 
@@ -121,8 +120,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                                .setParallelism(parallelism)
                                .name("Sum edge stats");
 
+               edgeMetricsHelper = new EdgeMetricsHelper<>();
+
                edgeStats
-                       .output(new EdgeMetricsHelper<K, EV>(id))
+                       .output(edgeMetricsHelper)
                                .setParallelism(parallelism)
                                .name("Edge metrics");
 
@@ -131,23 +132,14 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
        @Override
        public Result getResult() {
-               JobExecutionResult res = env.getLastJobExecutionResult();
-
-               long vertexCount = res.getAccumulatorResult(id + "-0");
-               long edgeCount = res.getAccumulatorResult(id + "-1");
-               long triangleTripletCount = res.getAccumulatorResult(id + "-2");
-               long rectangleTripletCount = res.getAccumulatorResult(id + 
"-3");
-               long tripletCount = res.getAccumulatorResult(id + "-4");
-               long maximumDegree = res.getAccumulatorResult(id + "-5");
-               long maximumOutDegree = res.getAccumulatorResult(id + "-6");
-               long maximumInDegree = res.getAccumulatorResult(id + "-7");
-               long maximumTriangleTriplets = res.getAccumulatorResult(id + 
"-8");
-               long maximumRectangleTriplets = res.getAccumulatorResult(id + 
"-9");
-               long maximumTriplets = res.getAccumulatorResult(id + "-a");
-
-               return new Result(vertexCount, edgeCount, triangleTripletCount, 
rectangleTripletCount, tripletCount,
-                       maximumDegree, maximumOutDegree, maximumInDegree,
-                       maximumTriangleTriplets, maximumRectangleTriplets, 
maximumTriplets);
+               long triangleTripletCount = 
edgeMetricsHelper.getAccumulator(env, TRIANGLE_TRIPLET_COUNT);
+               long rectangleTripletCount = 
edgeMetricsHelper.getAccumulator(env, RECTANGLE_TRIPLET_COUNT);
+               long maximumTriangleTriplets = 
edgeMetricsHelper.getAccumulator(env, MAXIMUM_TRIANGLE_TRIPLETS);
+               long maximumRectangleTriplets = 
edgeMetricsHelper.getAccumulator(env, MAXIMUM_RECTANGLE_TRIPLETS);
+
+               // each edge is counted twice, once from each vertex, so must 
be halved
+               return new Result(triangleTripletCount, rectangleTripletCount,
+                       maximumTriangleTriplets, maximumRectangleTriplets);
        }
 
        /**
@@ -238,34 +230,12 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
         *
         * @param <T> ID type
         */
-       private static class EdgeMetricsHelper<T extends Comparable<T>, ET>
-       extends RichOutputFormat<Tuple3<T, Degrees, LongValue>> {
-               private final String id;
-
-               private long vertexCount;
-               private long edgeCount;
+       private static class EdgeMetricsHelper<T extends Comparable<T>>
+       extends AnalyticHelper<Tuple3<T, Degrees, LongValue>> {
                private long triangleTripletCount;
                private long rectangleTripletCount;
-               private long tripletCount;
-               private long maximumDegree;
-               private long maximumOutDegree;
-               private long maximumInDegree;
                private long maximumTriangleTriplets;
                private long maximumRectangleTriplets;
-               private long maximumTriplets;
-
-               /**
-                * This helper class collects edge metrics by scanning over and
-                * discarding elements from the given DataSet.
-                *
-                * The unique id is required because Flink's accumulator 
namespace is
-                * among all operators.
-                *
-                * @param id unique string used for accumulator names
-                */
-               public EdgeMetricsHelper(String id) {
-                       this.id = id;
-               }
 
                @Override
                public void configure(Configuration parameters) {}
@@ -277,42 +247,26 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                public void writeRecord(Tuple3<T, Degrees, LongValue> record) 
throws IOException {
                        Degrees degrees = record.f1;
                        long degree = degrees.getDegree().getValue();
-                       long outDegree = degrees.getOutDegree().getValue();
-                       long inDegree = degrees.getInDegree().getValue();
 
                        long lowDegree = record.f2.getValue();
                        long highDegree = degree - lowDegree;
 
                        long triangleTriplets = lowDegree * (lowDegree - 1) / 2;
                        long rectangleTriplets = triangleTriplets + lowDegree * 
highDegree;
-                       long triplets = degree * (degree - 1) / 2;
 
-                       vertexCount++;
-                       edgeCount += outDegree;
                        triangleTripletCount += triangleTriplets;
                        rectangleTripletCount += rectangleTriplets;
-                       tripletCount += triplets;
-                       maximumDegree = Math.max(maximumDegree, degree);
-                       maximumOutDegree = Math.max(maximumOutDegree, 
outDegree);
-                       maximumInDegree = Math.max(maximumInDegree, inDegree);
+
                        maximumTriangleTriplets = 
Math.max(maximumTriangleTriplets, triangleTriplets);
                        maximumRectangleTriplets = 
Math.max(maximumRectangleTriplets, rectangleTriplets);
-                       maximumTriplets = Math.max(maximumTriplets, triplets);
                }
 
                @Override
                public void close() throws IOException {
-                       getRuntimeContext().addAccumulator(id + "-0", new 
LongCounter(vertexCount));
-                       getRuntimeContext().addAccumulator(id + "-1", new 
LongCounter(edgeCount));
-                       getRuntimeContext().addAccumulator(id + "-2", new 
LongCounter(triangleTripletCount));
-                       getRuntimeContext().addAccumulator(id + "-3", new 
LongCounter(rectangleTripletCount));
-                       getRuntimeContext().addAccumulator(id + "-4", new 
LongCounter(tripletCount));
-                       getRuntimeContext().addAccumulator(id + "-5", new 
LongMaximum(maximumDegree));
-                       getRuntimeContext().addAccumulator(id + "-6", new 
LongMaximum(maximumOutDegree));
-                       getRuntimeContext().addAccumulator(id + "-7", new 
LongMaximum(maximumInDegree));
-                       getRuntimeContext().addAccumulator(id + "-8", new 
LongMaximum(maximumTriangleTriplets));
-                       getRuntimeContext().addAccumulator(id + "-9", new 
LongMaximum(maximumRectangleTriplets));
-                       getRuntimeContext().addAccumulator(id + "-a", new 
LongMaximum(maximumTriplets));
+                       addAccumulator(TRIANGLE_TRIPLET_COUNT, new 
LongCounter(triangleTripletCount));
+                       addAccumulator(RECTANGLE_TRIPLET_COUNT, new 
LongCounter(rectangleTripletCount));
+                       addAccumulator(MAXIMUM_TRIANGLE_TRIPLETS, new 
LongMaximum(maximumTriangleTriplets));
+                       addAccumulator(MAXIMUM_RECTANGLE_TRIPLETS, new 
LongMaximum(maximumRectangleTriplets));
                }
        }
 
@@ -320,59 +274,17 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
         * Wraps edge metrics.
         */
        public static class Result {
-               private long vertexCount;
-               private long edgeCount;
                private long triangleTripletCount;
                private long rectangleTripletCount;
-               private long tripletCount;
-               private long maximumDegree;
-               private long maximumOutDegree;
-               private long maximumInDegree;
                private long maximumTriangleTriplets;
                private long maximumRectangleTriplets;
-               private long maximumTriplets;
 
-               public Result(long vertexCount, long edgeCount, long 
triangleTripletCount, long rectangleTripletCount, long tripletCount,
-                               long maximumDegree, long maximumOutDegree, long 
maximumInDegree,
-                               long maximumTriangleTriplets, long 
maximumRectangleTriplets, long maximumTriplets) {
-                       this.vertexCount = vertexCount;
-                       this.edgeCount = edgeCount;
+               public Result(long triangleTripletCount, long 
rectangleTripletCount,
+                               long maximumTriangleTriplets, long 
maximumRectangleTriplets) {
                        this.triangleTripletCount = triangleTripletCount;
                        this.rectangleTripletCount = rectangleTripletCount;
-                       this.tripletCount = tripletCount;
-                       this.maximumDegree = maximumDegree;
-                       this.maximumOutDegree = maximumOutDegree;
-                       this.maximumInDegree = maximumInDegree;
                        this.maximumTriangleTriplets = maximumTriangleTriplets;
                        this.maximumRectangleTriplets = 
maximumRectangleTriplets;
-                       this.maximumTriplets = maximumTriplets;
-               }
-
-               /**
-                * Get the number of vertices.
-                *
-                * @return number of vertices
-                */
-               public long getNumberOfVertices() {
-                       return vertexCount;
-               }
-
-               /**
-                * Get the number of edges.
-                *
-                * @return number of edges
-                */
-               public long getNumberOfEdges() {
-                       return edgeCount;
-               }
-
-               /**
-                * Get the average degree.
-                *
-                * @return average degree
-                */
-               public float getAverageDegree() {
-                       return edgeCount / (float)vertexCount;
                }
 
                /**
@@ -394,42 +306,6 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                }
 
                /**
-                * Get the number of triplets.
-                *
-                * @return number of triplets
-                */
-               public long getNumberOfTriplets() {
-                       return tripletCount;
-               }
-
-               /**
-                * Get the maximum degree.
-                *
-                * @return maximum degree
-                */
-               public long getMaximumDegree() {
-                       return maximumDegree;
-               }
-
-               /**
-                * Get the maximum out degree.
-                *
-                * @return maximum out degree
-                */
-               public long getMaximumOutDegree() {
-                       return maximumOutDegree;
-               }
-
-               /**
-                * Get the maximum in degree.
-                *
-                * @return maximum in degree
-                */
-               public long getMaximumInDegree() {
-                       return maximumInDegree;
-               }
-
-               /**
                 * Get the maximum triangle triplets.
                 *
                 * @return maximum triangle triplets
@@ -447,47 +323,23 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                        return maximumRectangleTriplets;
                }
 
-               /**
-                * Get the maximum triplets.
-                *
-                * @return maximum triplets
-                */
-               public long getMaximumTriplets() {
-                       return maximumTriplets;
-               }
-
                @Override
                public String toString() {
                        NumberFormat nf = NumberFormat.getInstance();
 
-                       return "vertex count: " + nf.format(vertexCount)
-                               + "; edge count: " + nf.format(edgeCount)
-                               + "; average degree: " + 
nf.format(getAverageDegree())
-                               + "; triangle triplet count: " + 
nf.format(triangleTripletCount)
+                       return "triangle triplet count: " + 
nf.format(triangleTripletCount)
                                + "; rectangle triplet count: " + 
nf.format(rectangleTripletCount)
-                               + "; triplet count: " + nf.format(tripletCount)
-                               + "; maximum degree: " + 
nf.format(maximumDegree)
-                               + "; maximum out degree: " + 
nf.format(maximumOutDegree)
-                               + "; maximum in degree: " + 
nf.format(maximumInDegree)
                                + "; maximum triangle triplets: " + 
nf.format(maximumTriangleTriplets)
-                               + "; maximum rectangle triplets: " + 
nf.format(maximumRectangleTriplets)
-                               + "; maximum triplets: " + 
nf.format(maximumTriplets);
+                               + "; maximum rectangle triplets: " + 
nf.format(maximumRectangleTriplets);
                }
 
                @Override
                public int hashCode() {
                        return new HashCodeBuilder()
-                               .append(vertexCount)
-                               .append(edgeCount)
                                .append(triangleTripletCount)
                                .append(rectangleTripletCount)
-                               .append(tripletCount)
-                               .append(maximumDegree)
-                               .append(maximumOutDegree)
-                               .append(maximumInDegree)
                                .append(maximumTriangleTriplets)
                                .append(maximumRectangleTriplets)
-                               .append(maximumTriplets)
                                .hashCode();
                }
 
@@ -500,17 +352,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                        Result rhs = (Result)obj;
 
                        return new EqualsBuilder()
-                               .append(vertexCount, rhs.vertexCount)
-                               .append(edgeCount, rhs.edgeCount)
                                .append(triangleTripletCount, 
rhs.triangleTripletCount)
                                .append(rectangleTripletCount, 
rhs.rectangleTripletCount)
-                               .append(tripletCount, rhs.tripletCount)
-                               .append(maximumDegree, rhs.maximumDegree)
-                               .append(maximumOutDegree, rhs.maximumOutDegree)
-                               .append(maximumInDegree, rhs.maximumInDegree)
                                .append(maximumTriangleTriplets, 
rhs.maximumTriangleTriplets)
                                .append(maximumRectangleTriplets, 
rhs.maximumRectangleTriplets)
-                               .append(maximumTriplets, rhs.maximumTriplets)
                                .isEquals();
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f025c455/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
index 909eea5..1285bb9 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
@@ -20,20 +20,17 @@ package org.apache.flink.graph.library.metric.directed;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.accumulators.LongMaximum;
-import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.AbstractGraphAnalytic;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.AnalyticHelper;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
 import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 import org.apache.flink.graph.library.metric.directed.VertexMetrics.Result;
 import org.apache.flink.types.CopyableValue;
-import org.apache.flink.util.AbstractID;
 
 import java.io.IOException;
 import java.text.NumberFormat;
@@ -44,6 +41,9 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * Compute the following vertex metrics in a directed graph:
  *  - number of vertices
  *  - number of edges
+ *  - number of unidirectional edges
+ *  - number of bidirectional edges
+ *  - average degree
  *  - number of triplets
  *  - maximum degree
  *  - maximum out degree
@@ -57,7 +57,23 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 public class VertexMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV>
 extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
-       private String id = new AbstractID().toString();
+       private static final String VERTEX_COUNT = "vertexCount";
+
+       private static final String UNIDIRECTIONAL_EDGE_COUNT = 
"unidirectionalEdgeCount";
+
+       private static final String BIDIRECTIONAL_EDGE_COUNT = 
"bidirectionalEdgeCount";
+
+       private static final String TRIPLET_COUNT = "tripletCount";
+
+       private static final String MAXIMUM_DEGREE = "maximumDegree";
+
+       private static final String MAXIMUM_OUT_DEGREE = "maximumOutDegree";
+
+       private static final String MAXIMUM_IN_DEGREE = "maximumInDegree";
+
+       private static final String MAXIMUM_TRIPLETS = "maximumTriplets";
+
+       private VertexMetricsHelper<K> vertexMetricsHelper;
 
        // Optional configuration
        private boolean includeZeroDegreeVertices = false;
@@ -101,8 +117,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                                
.setIncludeZeroDegreeVertices(includeZeroDegreeVertices)
                                .setParallelism(parallelism));
 
+               vertexMetricsHelper = new VertexMetricsHelper<>();
+
                vertexDegree
-                       .output(new VertexMetricsHelper<K>(id))
+                       .output(vertexMetricsHelper)
                                .name("Vertex metrics");
 
                return this;
@@ -110,17 +128,18 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
        @Override
        public Result getResult() {
-               JobExecutionResult res = env.getLastJobExecutionResult();
-
-               long vertexCount = res.getAccumulatorResult(id + "-0");
-               long edgeCount = res.getAccumulatorResult(id + "-1");
-               long tripletCount = res.getAccumulatorResult(id + "-2");
-               long maximumDegree = res.getAccumulatorResult(id + "-3");
-               long maximumOutDegree = res.getAccumulatorResult(id + "-4");
-               long maximumInDegree = res.getAccumulatorResult(id + "-5");
-               long maximumTriplets = res.getAccumulatorResult(id + "-6");
-
-               return new Result(vertexCount, edgeCount, tripletCount, 
maximumDegree, maximumOutDegree, maximumInDegree, maximumTriplets);
+               long vertexCount = vertexMetricsHelper.getAccumulator(env, 
VERTEX_COUNT);
+               long unidirectionalEdgeCount = 
vertexMetricsHelper.getAccumulator(env, UNIDIRECTIONAL_EDGE_COUNT);
+               long bidirectionalEdgeCount = 
vertexMetricsHelper.getAccumulator(env, BIDIRECTIONAL_EDGE_COUNT);
+               long tripletCount = vertexMetricsHelper.getAccumulator(env, 
TRIPLET_COUNT);
+               long maximumDegree = vertexMetricsHelper.getAccumulator(env, 
MAXIMUM_DEGREE);
+               long maximumOutDegree = vertexMetricsHelper.getAccumulator(env, 
MAXIMUM_OUT_DEGREE);
+               long maximumInDegree = vertexMetricsHelper.getAccumulator(env, 
MAXIMUM_IN_DEGREE);
+               long maximumTriplets = vertexMetricsHelper.getAccumulator(env, 
MAXIMUM_TRIPLETS);
+
+               // each edge is counted twice, once from each vertex, so must 
be halved
+               return new Result(vertexCount, unidirectionalEdgeCount / 2, 
bidirectionalEdgeCount / 2, tripletCount,
+                       maximumDegree, maximumOutDegree, maximumInDegree, 
maximumTriplets);
        }
 
        /**
@@ -129,45 +148,28 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
         * @param <T> ID type
         */
        private static class VertexMetricsHelper<T>
-       extends RichOutputFormat<Vertex<T, Degrees>> {
-               private final String id;
-
+       extends AnalyticHelper<Vertex<T, Degrees>> {
                private long vertexCount;
-               private long edgeCount;
+               private long unidirectionalEdgeCount;
+               private long bidirectionalEdgeCount;
                private long tripletCount;
                private long maximumDegree;
                private long maximumOutDegree;
                private long maximumInDegree;
                private long maximumTriplets;
 
-               /**
-                * This helper class collects vertex metrics by scanning over 
and
-                * discarding elements from the given DataSet.
-                *
-                * The unique id is required because Flink's accumulator 
namespace is
-                * shared among all operators.
-                *
-                * @param id unique string used for accumulator names
-                */
-               public VertexMetricsHelper(String id) {
-                       this.id = id;
-               }
-
-               @Override
-               public void configure(Configuration parameters) {}
-
-               @Override
-               public void open(int taskNumber, int numTasks) throws 
IOException {}
-
                @Override
                public void writeRecord(Vertex<T, Degrees> record) throws 
IOException {
                        long degree = record.f1.getDegree().getValue();
                        long outDegree = record.f1.getOutDegree().getValue();
                        long inDegree = record.f1.getInDegree().getValue();
+
+                       long bidirectionalEdges = outDegree + inDegree - degree;
                        long triplets = degree * (degree - 1) / 2;
 
                        vertexCount++;
-                       edgeCount += outDegree;
+                       unidirectionalEdgeCount += degree - bidirectionalEdges;
+                       bidirectionalEdgeCount += bidirectionalEdges;
                        tripletCount += triplets;
                        maximumDegree = Math.max(maximumDegree, degree);
                        maximumOutDegree = Math.max(maximumOutDegree, 
outDegree);
@@ -177,13 +179,14 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
                @Override
                public void close() throws IOException {
-                       getRuntimeContext().addAccumulator(id + "-0", new 
LongCounter(vertexCount));
-                       getRuntimeContext().addAccumulator(id + "-1", new 
LongCounter(edgeCount));
-                       getRuntimeContext().addAccumulator(id + "-2", new 
LongCounter(tripletCount));
-                       getRuntimeContext().addAccumulator(id + "-3", new 
LongMaximum(maximumDegree));
-                       getRuntimeContext().addAccumulator(id + "-4", new 
LongMaximum(maximumOutDegree));
-                       getRuntimeContext().addAccumulator(id + "-5", new 
LongMaximum(maximumInDegree));
-                       getRuntimeContext().addAccumulator(id + "-6", new 
LongMaximum(maximumTriplets));
+                       addAccumulator(VERTEX_COUNT, new 
LongCounter(vertexCount));
+                       addAccumulator(UNIDIRECTIONAL_EDGE_COUNT, new 
LongCounter(unidirectionalEdgeCount));
+                       addAccumulator(BIDIRECTIONAL_EDGE_COUNT, new 
LongCounter(bidirectionalEdgeCount));
+                       addAccumulator(TRIPLET_COUNT, new 
LongCounter(tripletCount));
+                       addAccumulator(MAXIMUM_DEGREE, new 
LongMaximum(maximumDegree));
+                       addAccumulator(MAXIMUM_OUT_DEGREE, new 
LongMaximum(maximumOutDegree));
+                       addAccumulator(MAXIMUM_IN_DEGREE, new 
LongMaximum(maximumInDegree));
+                       addAccumulator(MAXIMUM_TRIPLETS, new 
LongMaximum(maximumTriplets));
                }
        }
 
@@ -192,16 +195,19 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
         */
        public static class Result {
                private long vertexCount;
-               private long edgeCount;
+               private long unidirectionalEdgeCount;
+               private long bidirectionalEdgeCount;
                private long tripletCount;
                private long maximumDegree;
                private long maximumOutDegree;
                private long maximumInDegree;
                private long maximumTriplets;
 
-               public Result(long vertexCount, long edgeCount, long 
tripletCount, long maximumDegree, long maximumOutDegree, long maximumInDegree, 
long maximumTriplets) {
+               public Result(long vertexCount, long unidirectionalEdgeCount, 
long bidirectionalEdgeCount, long tripletCount,
+                               long maximumDegree, long maximumOutDegree, long 
maximumInDegree, long maximumTriplets) {
                        this.vertexCount = vertexCount;
-                       this.edgeCount = edgeCount;
+                       this.unidirectionalEdgeCount = unidirectionalEdgeCount;
+                       this.bidirectionalEdgeCount = bidirectionalEdgeCount;
                        this.tripletCount = tripletCount;
                        this.maximumDegree = maximumDegree;
                        this.maximumOutDegree = maximumOutDegree;
@@ -224,7 +230,25 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                 * @return number of edges
                 */
                public long getNumberOfEdges() {
-                       return edgeCount;
+                       return unidirectionalEdgeCount + 2 * 
bidirectionalEdgeCount;
+               }
+
+               /**
+                * Get the number of unidirectional edges.
+                *
+                * @return number of unidirectional edges
+                */
+               public long getNumberOfDirectedEdges() {
+                       return unidirectionalEdgeCount;
+               }
+
+               /**
+                * Get the number of bidirectional edges.
+                *
+                * @return number of bidirectional edges
+                */
+               public long getNumberOfUndirectedEdges() {
+                       return bidirectionalEdgeCount;
                }
 
                /**
@@ -233,7 +257,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                 * @return average degree
                 */
                public float getAverageDegree() {
-                       return edgeCount / (float)vertexCount;
+                       return getNumberOfEdges() / (float)vertexCount;
                }
 
                /**
@@ -286,7 +310,9 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                        NumberFormat nf = NumberFormat.getInstance();
 
                        return "vertex count: " + nf.format(vertexCount)
-                               + "; edge count: " + nf.format(edgeCount)
+                               + "; edge count: " + 
nf.format(getNumberOfEdges())
+                               + "; unidirectional edge count: " + 
nf.format(unidirectionalEdgeCount)
+                               + "; bidirectional edge count: " + 
nf.format(bidirectionalEdgeCount)
                                + "; average degree: " + 
nf.format(getAverageDegree())
                                + "; triplet count: " + nf.format(tripletCount)
                                + "; maximum degree: " + 
nf.format(maximumDegree)
@@ -299,7 +325,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                public int hashCode() {
                        return new HashCodeBuilder()
                                .append(vertexCount)
-                               .append(edgeCount)
+                               .append(unidirectionalEdgeCount)
+                               .append(bidirectionalEdgeCount)
                                .append(tripletCount)
                                .append(maximumDegree)
                                .append(maximumOutDegree)
@@ -318,7 +345,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
                        return new EqualsBuilder()
                                .append(vertexCount, rhs.vertexCount)
-                               .append(edgeCount, rhs.edgeCount)
+                               .append(unidirectionalEdgeCount, 
rhs.unidirectionalEdgeCount)
+                               .append(bidirectionalEdgeCount, 
rhs.bidirectionalEdgeCount)
                                .append(tripletCount, rhs.tripletCount)
                                .append(maximumDegree, rhs.maximumDegree)
                                .append(maximumOutDegree, rhs.maximumOutDegree)

http://git-wip-us.apache.org/repos/asf/flink/blob/f025c455/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
index 1c636ff..a4deeaf 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
@@ -20,25 +20,22 @@ package org.apache.flink.graph.library.metric.undirected;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.accumulators.LongMaximum;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.io.RichOutputFormat;
 import 
org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.AbstractGraphAnalytic;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.AnalyticHelper;
 import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeDegreePair;
 import org.apache.flink.graph.library.metric.undirected.EdgeMetrics.Result;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
-import org.apache.flink.util.AbstractID;
 
 import java.io.IOException;
 import java.text.NumberFormat;
@@ -47,15 +44,10 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Compute the following edge metrics in an undirected graph:
- *  - number of vertices
- *  - number of edges
  *  - number of triangle triplets
  *  - number of rectangle triplets
- *  - number of triplets
- *  - maximum degree
  *  - maximum number of triangle triplets
  *  - maximum number of rectangle triplets
- *  - maximum number of triplets
  *
  * @param <K> graph ID type
  * @param <VV> vertex value type
@@ -64,7 +56,15 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 public class EdgeMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV>
 extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
-       private String id = new AbstractID().toString();
+       private static final String TRIANGLE_TRIPLET_COUNT = 
"triangleTripletCount";
+
+       private static final String RECTANGLE_TRIPLET_COUNT = 
"rectangleTripletCount";
+
+       private static final String MAXIMUM_TRIANGLE_TRIPLETS = 
"maximumTriangleTriplets";
+
+       private static final String MAXIMUM_RECTANGLE_TRIPLETS = 
"maximumRectangleTriplets";
+
+       private EdgeMetricsHelper<K> edgeMetricsHelper;
 
        // Optional configuration
        private boolean reduceOnTargetId = false;
@@ -127,8 +127,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                                .setParallelism(parallelism)
                                .name("Sum edge stats");
 
+               edgeMetricsHelper = new EdgeMetricsHelper<>();
+
                edgeStats
-                       .output(new EdgeMetricsHelper<K, EV>(id))
+                       .output(edgeMetricsHelper)
                                .setParallelism(parallelism)
                                .name("Edge metrics");
 
@@ -137,20 +139,13 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
        @Override
        public Result getResult() {
-               JobExecutionResult res = env.getLastJobExecutionResult();
-
-               long vertexCount = res.getAccumulatorResult(id + "-0");
-               long edgeCount = res.getAccumulatorResult(id + "-1");
-               long triangleTripletCount = res.getAccumulatorResult(id + "-2");
-               long rectangleTripletCount = res.getAccumulatorResult(id + 
"-3");
-               long tripletCount = res.getAccumulatorResult(id + "-4");
-               long maximumDegree = res.getAccumulatorResult(id + "-5");
-               long maximumTriangleTriplets = res.getAccumulatorResult(id + 
"-6");
-               long maximumRectangleTriplets = res.getAccumulatorResult(id + 
"-7");
-               long maximumTriplets = res.getAccumulatorResult(id + "-8");
-
-               return new Result(vertexCount, edgeCount / 2, 
triangleTripletCount, rectangleTripletCount, tripletCount,
-                       maximumDegree, maximumTriangleTriplets, 
maximumRectangleTriplets, maximumTriplets);
+               long triangleTripletCount = 
edgeMetricsHelper.getAccumulator(env, TRIANGLE_TRIPLET_COUNT);
+               long rectangleTripletCount = 
edgeMetricsHelper.getAccumulator(env, RECTANGLE_TRIPLET_COUNT);
+               long maximumTriangleTriplets = 
edgeMetricsHelper.getAccumulator(env, MAXIMUM_TRIANGLE_TRIPLETS);
+               long maximumRectangleTriplets = 
edgeMetricsHelper.getAccumulator(env, MAXIMUM_RECTANGLE_TRIPLETS);
+
+               return new Result(triangleTripletCount, rectangleTripletCount,
+                       maximumTriangleTriplets, maximumRectangleTriplets);
        }
 
        /**
@@ -215,71 +210,36 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
         *
         * @param <T> ID type
         */
-       private static class EdgeMetricsHelper<T extends Comparable<T>, ET>
-       extends RichOutputFormat<Tuple3<T, LongValue, LongValue>> {
-               private final String id;
-
-               private long vertexCount;
-               private long edgeCount;
+       private static class EdgeMetricsHelper<T extends Comparable<T>>
+       extends AnalyticHelper<Tuple3<T, LongValue, LongValue>> {
                private long triangleTripletCount;
                private long rectangleTripletCount;
-               private long tripletCount;
-               private long maximumDegree;
                private long maximumTriangleTriplets;
                private long maximumRectangleTriplets;
-               private long maximumTriplets;
-
-               /**
-                * This helper class collects edge metrics by scanning over and
-                * discarding elements from the given DataSet.
-                *
-                * The unique id is required because Flink's accumulator 
namespace is
-                * among all operators.
-                *
-                * @param id unique string used for accumulator names
-                */
-               public EdgeMetricsHelper(String id) {
-                       this.id = id;
-               }
-
-               @Override
-               public void configure(Configuration parameters) {}
-
-               @Override
-               public void open(int taskNumber, int numTasks) throws 
IOException {}
 
                @Override
                public void writeRecord(Tuple3<T, LongValue, LongValue> record) 
throws IOException {
                        long degree = record.f1.getValue();
+
                        long lowDegree = record.f2.getValue();
                        long highDegree = degree - lowDegree;
 
                        long triangleTriplets = lowDegree * (lowDegree - 1) / 2;
                        long rectangleTriplets = triangleTriplets + lowDegree * 
highDegree;
-                       long triplets = degree * (degree - 1) / 2;
 
-                       vertexCount++;
-                       edgeCount += degree;
                        triangleTripletCount += triangleTriplets;
                        rectangleTripletCount += rectangleTriplets;
-                       tripletCount += triplets;
-                       maximumDegree = Math.max(maximumDegree, degree);
+
                        maximumTriangleTriplets = 
Math.max(maximumTriangleTriplets, triangleTriplets);
                        maximumRectangleTriplets = 
Math.max(maximumRectangleTriplets, rectangleTriplets);
-                       maximumTriplets = Math.max(maximumTriplets, triplets);
                }
 
                @Override
                public void close() throws IOException {
-                       getRuntimeContext().addAccumulator(id + "-0", new 
LongCounter(vertexCount));
-                       getRuntimeContext().addAccumulator(id + "-1", new 
LongCounter(edgeCount));
-                       getRuntimeContext().addAccumulator(id + "-2", new 
LongCounter(triangleTripletCount));
-                       getRuntimeContext().addAccumulator(id + "-3", new 
LongCounter(rectangleTripletCount));
-                       getRuntimeContext().addAccumulator(id + "-4", new 
LongCounter(tripletCount));
-                       getRuntimeContext().addAccumulator(id + "-5", new 
LongMaximum(maximumDegree));
-                       getRuntimeContext().addAccumulator(id + "-6", new 
LongMaximum(maximumTriangleTriplets));
-                       getRuntimeContext().addAccumulator(id + "-7", new 
LongMaximum(maximumRectangleTriplets));
-                       getRuntimeContext().addAccumulator(id + "-8", new 
LongMaximum(maximumTriplets));
+                       addAccumulator(TRIANGLE_TRIPLET_COUNT, new 
LongCounter(triangleTripletCount));
+                       addAccumulator(RECTANGLE_TRIPLET_COUNT, new 
LongCounter(rectangleTripletCount));
+                       addAccumulator(MAXIMUM_TRIANGLE_TRIPLETS, new 
LongMaximum(maximumTriangleTriplets));
+                       addAccumulator(MAXIMUM_RECTANGLE_TRIPLETS, new 
LongMaximum(maximumRectangleTriplets));
                }
        }
 
@@ -287,54 +247,17 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
         * Wraps edge metrics.
         */
        public static class Result {
-               private long vertexCount;
-               private long edgeCount;
                private long triangleTripletCount;
                private long rectangleTripletCount;
-               private long tripletCount;
-               private long maximumDegree;
                private long maximumTriangleTriplets;
                private long maximumRectangleTriplets;
-               private long maximumTriplets;
 
-               public Result(long vertexCount, long edgeCount, long 
triangleTripletCount, long rectangleTripletCount, long tripletCount,
-                               long maximumDegree, long 
maximumTriangleTriplets, long maximumRectangleTriplets, long maximumTriplets) {
-                       this.vertexCount = vertexCount;
-                       this.edgeCount = edgeCount;
+               public Result(long triangleTripletCount, long 
rectangleTripletCount,
+                               long maximumTriangleTriplets, long 
maximumRectangleTriplets) {
                        this.triangleTripletCount = triangleTripletCount;
                        this.rectangleTripletCount = rectangleTripletCount;
-                       this.tripletCount = tripletCount;
-                       this.maximumDegree = maximumDegree;
                        this.maximumTriangleTriplets = maximumTriangleTriplets;
                        this.maximumRectangleTriplets = 
maximumRectangleTriplets;
-                       this.maximumTriplets = maximumTriplets;
-               }
-
-               /**
-                * Get the number of vertices.
-                *
-                * @return number of vertices
-                */
-               public long getNumberOfVertices() {
-                       return vertexCount;
-               }
-
-               /**
-                * Get the number of edges.
-                *
-                * @return number of edges
-                */
-               public long getNumberOfEdges() {
-                       return edgeCount;
-               }
-
-               /**
-                * Get the average degree.
-                *
-                * @return average degree
-                */
-               public float getAverageDegree() {
-                       return edgeCount / (float)vertexCount;
                }
 
                /**
@@ -356,24 +279,6 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                }
 
                /**
-                * Get the number of triplets.
-                *
-                * @return number of triplets
-                */
-               public long getNumberOfTriplets() {
-                       return tripletCount;
-               }
-
-               /**
-                * Get the maximum degree.
-                *
-                * @return maximum degree
-                */
-               public long getMaximumDegree() {
-                       return maximumDegree;
-               }
-
-               /**
                 * Get the maximum triangle triplets.
                 *
                 * @return maximum triangle triplets
@@ -391,43 +296,23 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                        return maximumRectangleTriplets;
                }
 
-               /**
-                * Get the maximum triplets.
-                *
-                * @return maximum triplets
-                */
-               public long getMaximumTriplets() {
-                       return maximumTriplets;
-               }
-
                @Override
                public String toString() {
                        NumberFormat nf = NumberFormat.getInstance();
 
-                       return "vertex count: " + nf.format(vertexCount)
-                               + "; edge count: " + nf.format(edgeCount)
-                               + "; average degree: " + 
nf.format(getAverageDegree())
-                               + "; triangle triplet count: " + 
nf.format(triangleTripletCount)
+                       return "triangle triplet count: " + 
nf.format(triangleTripletCount)
                                + "; rectangle triplet count: " + 
nf.format(rectangleTripletCount)
-                               + "; triplet count: " + nf.format(tripletCount)
-                               + "; maximum degree: " + 
nf.format(maximumDegree)
                                + "; maximum triangle triplets: " + 
nf.format(maximumTriangleTriplets)
-                               + "; maximum rectangle triplets: " + 
nf.format(maximumRectangleTriplets)
-                               + "; maximum triplets: " + 
nf.format(maximumTriplets);
+                               + "; maximum rectangle triplets: " + 
nf.format(maximumRectangleTriplets);
                }
 
                @Override
                public int hashCode() {
                        return new HashCodeBuilder()
-                               .append(vertexCount)
-                               .append(edgeCount)
                                .append(triangleTripletCount)
                                .append(rectangleTripletCount)
-                               .append(tripletCount)
-                               .append(maximumDegree)
                                .append(maximumTriangleTriplets)
                                .append(maximumRectangleTriplets)
-                               .append(maximumTriplets)
                                .hashCode();
                }
 
@@ -440,15 +325,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                        Result rhs = (Result)obj;
 
                        return new EqualsBuilder()
-                               .append(vertexCount, rhs.vertexCount)
-                               .append(edgeCount, rhs.edgeCount)
                                .append(triangleTripletCount, 
rhs.triangleTripletCount)
                                .append(rectangleTripletCount, 
rhs.rectangleTripletCount)
-                               .append(tripletCount, rhs.tripletCount)
-                               .append(maximumDegree, rhs.maximumDegree)
                                .append(maximumTriangleTriplets, 
rhs.maximumTriangleTriplets)
                                .append(maximumRectangleTriplets, 
rhs.maximumRectangleTriplets)
-                               .append(maximumTriplets, rhs.maximumTriplets)
                                .isEquals();
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f025c455/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
index 8012605..ee67129 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
@@ -20,20 +20,17 @@ package org.apache.flink.graph.library.metric.undirected;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.accumulators.LongMaximum;
-import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.AbstractGraphAnalytic;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.AnalyticHelper;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
 import org.apache.flink.graph.library.metric.undirected.VertexMetrics.Result;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
-import org.apache.flink.util.AbstractID;
 
 import java.io.IOException;
 import java.text.NumberFormat;
@@ -44,6 +41,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * Compute the following vertex metrics in an undirected graph:
  *  - number of vertices
  *  - number of edges
+ *  - average degree
  *  - number of triplets
  *  - maximum degree
  *  - maximum number of triplets
@@ -55,7 +53,17 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 public class VertexMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV>
 extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
-       private String id = new AbstractID().toString();
+       private static final String VERTEX_COUNT = "vertexCount";
+
+       private static final String EDGE_COUNT = "edgeCount";
+
+       private static final String TRIPLET_COUNT = "tripletCount";
+
+       private static final String MAXIMUM_DEGREE = "maximumDegree";
+
+       private static final String MAXIMUM_TRIPLETS = "maximumTriplets";
+
+       private VertexMetricsHelper<K> vertexMetricsHelper;
 
        // Optional configuration
        private boolean includeZeroDegreeVertices = false;
@@ -117,8 +125,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                                .setReduceOnTargetId(reduceOnTargetId)
                                .setParallelism(parallelism));
 
+               vertexMetricsHelper = new VertexMetricsHelper<>();
+
                vertexDegree
-                       .output(new VertexMetricsHelper<K>(id))
+                       .output(vertexMetricsHelper)
                                .name("Vertex metrics");
 
                return this;
@@ -126,14 +136,13 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
        @Override
        public Result getResult() {
-               JobExecutionResult res = env.getLastJobExecutionResult();
-
-               long vertexCount = res.getAccumulatorResult(id + "-0");
-               long edgeCount = res.getAccumulatorResult(id + "-1");
-               long tripletCount = res.getAccumulatorResult(id + "-2");
-               long maximumDegree = res.getAccumulatorResult(id + "-3");
-               long maximumTriplets = res.getAccumulatorResult(id + "-4");
+               long vertexCount = vertexMetricsHelper.getAccumulator(env, 
VERTEX_COUNT);
+               long edgeCount = vertexMetricsHelper.getAccumulator(env, 
EDGE_COUNT);
+               long tripletCount = vertexMetricsHelper.getAccumulator(env, 
TRIPLET_COUNT);
+               long maximumDegree = vertexMetricsHelper.getAccumulator(env, 
MAXIMUM_DEGREE);
+               long maximumTriplets = vertexMetricsHelper.getAccumulator(env, 
MAXIMUM_TRIPLETS);
 
+               // each edge is counted twice, once from each vertex, so must 
be halved
                return new Result(vertexCount, edgeCount / 2, tripletCount, 
maximumDegree, maximumTriplets);
        }
 
@@ -143,34 +152,13 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
         * @param <T> ID type
         */
        private static class VertexMetricsHelper<T>
-       extends RichOutputFormat<Vertex<T, LongValue>> {
-               private final String id;
-
+       extends AnalyticHelper<Vertex<T, LongValue>> {
                private long vertexCount;
                private long edgeCount;
                private long tripletCount;
                private long maximumDegree;
                private long maximumTriplets;
 
-               /**
-                * This helper class collects vertex metrics by scanning over 
and
-                * discarding elements from the given DataSet.
-                *
-                * The unique id is required because Flink's accumulator 
namespace is
-                * shared among all operators.
-                *
-                * @param id unique string used for accumulator names
-                */
-               public VertexMetricsHelper(String id) {
-                       this.id = id;
-               }
-
-               @Override
-               public void configure(Configuration parameters) {}
-
-               @Override
-               public void open(int taskNumber, int numTasks) throws 
IOException {}
-
                @Override
                public void writeRecord(Vertex<T, LongValue> record) throws 
IOException {
                        long degree = record.f1.getValue();
@@ -185,11 +173,11 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
                @Override
                public void close() throws IOException {
-                       getRuntimeContext().addAccumulator(id + "-0", new 
LongCounter(vertexCount));
-                       getRuntimeContext().addAccumulator(id + "-1", new 
LongCounter(edgeCount));
-                       getRuntimeContext().addAccumulator(id + "-2", new 
LongCounter(tripletCount));
-                       getRuntimeContext().addAccumulator(id + "-3", new 
LongMaximum(maximumDegree));
-                       getRuntimeContext().addAccumulator(id + "-4", new 
LongMaximum(maximumTriplets));
+                       addAccumulator(VERTEX_COUNT, new 
LongCounter(vertexCount));
+                       addAccumulator(EDGE_COUNT, new LongCounter(edgeCount));
+                       addAccumulator(TRIPLET_COUNT, new 
LongCounter(tripletCount));
+                       addAccumulator(MAXIMUM_DEGREE, new 
LongMaximum(maximumDegree));
+                       addAccumulator(MAXIMUM_TRIPLETS, new 
LongMaximum(maximumTriplets));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f025c455/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java
index af5a154..9711cc0 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java
@@ -34,7 +34,7 @@ extends AsmTestBase {
        @Test
        public void testWithSimpleGraph()
                        throws Exception {
-               Result expectedResult = new Result(6, 7, 2, 6, 13, 4, 2, 3, 1, 
3, 6);
+               Result expectedResult = new Result(2, 6, 1, 3);
 
                Result edgeMetrics = new EdgeMetrics<IntValue, NullValue, 
NullValue>()
                        .run(directedSimpleGraph)
@@ -47,13 +47,11 @@ extends AsmTestBase {
        public void testWithCompleteGraph()
                        throws Exception {
                long expectedDegree = completeGraphVertexCount - 1;
-               long expectedEdges = completeGraphVertexCount * expectedDegree;
                long expectedMaximumTriplets = 
CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
                long expectedTriplets = completeGraphVertexCount * 
expectedMaximumTriplets;
 
-               Result expectedResult = new Result(completeGraphVertexCount, 
expectedEdges, expectedTriplets / 3, 2 * expectedTriplets / 3, expectedTriplets,
-                       expectedDegree, expectedDegree, expectedDegree,
-                       expectedMaximumTriplets, expectedMaximumTriplets, 
expectedMaximumTriplets);
+               Result expectedResult = new Result(expectedTriplets / 3, 2 * 
expectedTriplets / 3,
+                       expectedMaximumTriplets, expectedMaximumTriplets);
 
                Result edgeMetrics = new EdgeMetrics<LongValue, NullValue, 
NullValue>()
                        .run(completeGraph)
@@ -67,7 +65,7 @@ extends AsmTestBase {
                        throws Exception {
                Result expectedResult;
 
-               expectedResult = new Result(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+               expectedResult = new Result(0, 0, 0, 0);
 
                Result withoutZeroDegreeVertices = new EdgeMetrics<LongValue, 
NullValue, NullValue>()
                        .run(emptyGraph)
@@ -79,7 +77,7 @@ extends AsmTestBase {
        @Test
        public void testWithRMatGraph()
                        throws Exception {
-               Result expectedResult = new Result(902, 12009, 107817, 315537, 
1003442, 463, 334, 342, 820, 3822, 106953);
+               Result expectedResult = new Result(107817, 315537, 820, 3822);
 
                Result withoutZeroDegreeVertices = new EdgeMetrics<LongValue, 
NullValue, NullValue>()
                        .run(directedRMatGraph)

http://git-wip-us.apache.org/repos/asf/flink/blob/f025c455/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
index e4362c0..c4ec8f8 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
@@ -34,7 +34,7 @@ extends AsmTestBase {
        @Test
        public void testWithSimpleGraph()
                        throws Exception {
-               Result expectedResult = new Result(6, 7, 13, 4, 2, 3, 6);
+               Result expectedResult = new Result(6, 7, 0, 13, 4, 2, 3, 6);
 
                Result vertexMetrics = new VertexMetrics<IntValue, NullValue, 
NullValue>()
                        .run(directedSimpleGraph)
@@ -47,11 +47,12 @@ extends AsmTestBase {
        public void testWithCompleteGraph()
                        throws Exception {
                long expectedDegree = completeGraphVertexCount - 1;
-               long expectedEdges = completeGraphVertexCount * expectedDegree;
+               long expectedBidirectionalEdges = completeGraphVertexCount * 
expectedDegree / 2;
                long expectedMaximumTriplets = 
CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
                long expectedTriplets = completeGraphVertexCount * 
expectedMaximumTriplets;
 
-               Result expectedResult = new Result(completeGraphVertexCount, 
expectedEdges, expectedTriplets, expectedDegree, expectedDegree, 
expectedDegree, expectedMaximumTriplets);
+               Result expectedResult = new Result(completeGraphVertexCount, 0, 
expectedBidirectionalEdges,
+                       expectedTriplets, expectedDegree, expectedDegree, 
expectedDegree, expectedMaximumTriplets);
 
                Result vertexMetrics = new VertexMetrics<LongValue, NullValue, 
NullValue>()
                        .run(completeGraph)
@@ -65,7 +66,7 @@ extends AsmTestBase {
                        throws Exception {
                Result expectedResult;
 
-               expectedResult = new Result(0, 0, 0, 0, 0, 0, 0);
+               expectedResult = new Result(0, 0, 0, 0, 0, 0, 0, 0);
 
                Result withoutZeroDegreeVertices = new VertexMetrics<LongValue, 
NullValue, NullValue>()
                        .setIncludeZeroDegreeVertices(false)
@@ -74,7 +75,7 @@ extends AsmTestBase {
 
                assertEquals(withoutZeroDegreeVertices, expectedResult);
 
-               expectedResult = new Result(3, 0, 0, 0, 0, 0, 0);
+               expectedResult = new Result(3, 0, 0, 0, 0, 0, 0, 0);
 
                Result withZeroDegreeVertices = new VertexMetrics<LongValue, 
NullValue, NullValue>()
                        .setIncludeZeroDegreeVertices(true)
@@ -87,7 +88,7 @@ extends AsmTestBase {
        @Test
        public void testWithRMatGraph()
                        throws Exception {
-               Result expectedResult = new Result(902, 12009, 1003442, 463, 
334, 342, 106953);
+               Result expectedResult = new Result(902, 8875, 1567, 1003442, 
463, 334, 342, 106953);
 
                Result withoutZeroDegreeVertices = new VertexMetrics<LongValue, 
NullValue, NullValue>()
                        .run(directedRMatGraph)

http://git-wip-us.apache.org/repos/asf/flink/blob/f025c455/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java
index b300d66..cb75331 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java
@@ -34,7 +34,7 @@ extends AsmTestBase {
        @Test
        public void testWithSimpleGraph()
                        throws Exception {
-               Result expectedResult = new Result(6, 7, 2, 6, 13, 4, 1, 3, 6);
+               Result expectedResult = new Result(2, 6, 1, 3);
 
                Result edgeMetrics = new EdgeMetrics<IntValue, NullValue, 
NullValue>()
                        .run(undirectedSimpleGraph)
@@ -47,12 +47,11 @@ extends AsmTestBase {
        public void testWithCompleteGraph()
                        throws Exception {
                long expectedDegree = completeGraphVertexCount - 1;
-               long expectedEdges = completeGraphVertexCount * expectedDegree 
/ 2;
                long expectedMaximumTriplets = 
CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
                long expectedTriplets = completeGraphVertexCount * 
expectedMaximumTriplets;
 
-               Result expectedResult = new Result(completeGraphVertexCount, 
expectedEdges, expectedTriplets / 3, 2 * expectedTriplets / 3, expectedTriplets,
-                       expectedDegree, expectedMaximumTriplets, 
expectedMaximumTriplets, expectedMaximumTriplets);
+               Result expectedResult = new Result(expectedTriplets / 3, 2 * 
expectedTriplets / 3,
+                       expectedMaximumTriplets, expectedMaximumTriplets);
 
                Result edgeMetrics = new EdgeMetrics<LongValue, NullValue, 
NullValue>()
                        .run(completeGraph)
@@ -66,7 +65,7 @@ extends AsmTestBase {
                        throws Exception {
                Result expectedResult;
 
-               expectedResult = new Result(0, 0, 0, 0, 0, 0, 0, 0, 0);
+               expectedResult = new Result(0, 0, 0, 0);
 
                Result withoutZeroDegreeVertices = new EdgeMetrics<LongValue, 
NullValue, NullValue>()
                        .run(emptyGraph)
@@ -78,7 +77,7 @@ extends AsmTestBase {
        @Test
        public void testWithRMatGraph()
                        throws Exception {
-               Result expectedResult = new Result(902, 10442, 107817, 315537, 
1003442, 463, 820, 3822, 106953);
+               Result expectedResult = new Result(107817, 315537, 820, 3822);
 
                Result withoutZeroDegreeVertices = new EdgeMetrics<LongValue, 
NullValue, NullValue>()
                        .run(undirectedRMatGraph)

Reply via email to