[FLINK-7273] [gelly] Gelly tests with empty graphs

There exist some tests with empty graphs but the `EmptyGraph` in
`AsmTestBase` contained vertices but no edges. Add a new `EmptyGraph`
without vertices and test both empty graphs for each algorithm.

EmptyGraph now generates the proper TypeInformation (for Edge<> not
Tuple3) which had prevented adding edges due to a union incompatibility.

GraphGeneratorUtils#vertexSet now uses a hash-combine for distinct.

`PageRank` optionally includes zero-degree vertices in the results
(at a performance cost).

This closes #4405


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9437a0ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9437a0ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9437a0ff

Branch: refs/heads/master
Commit: 9437a0ffc04318f6a1a2d19c59f2ae6651b26507
Parents: 6612c0e
Author: Greg Hogan <c...@greghogan.com>
Authored: Wed Jul 26 06:23:52 2017 -0400
Committer: Greg Hogan <c...@greghogan.com>
Committed: Thu Sep 14 23:25:25 2017 -0400

----------------------------------------------------------------------
 docs/dev/libs/gelly/library_methods.md          |   1 +
 .../apache/flink/graph/drivers/PageRank.java    |   6 +-
 .../flink/graph/drivers/input/GridGraph.java    |   1 -
 .../flink/graph/generator/CirculantGraph.java   |   2 +-
 .../flink/graph/generator/CompleteGraph.java    |   2 +-
 .../flink/graph/generator/CycleGraph.java       |   2 +-
 .../apache/flink/graph/generator/EchoGraph.java |   4 +-
 .../flink/graph/generator/EmptyGraph.java       |  14 ++-
 .../graph/generator/GraphGeneratorUtils.java    |  46 ++++++---
 .../apache/flink/graph/generator/GridGraph.java |   2 +-
 .../flink/graph/generator/HypercubeGraph.java   |   4 +-
 .../apache/flink/graph/generator/PathGraph.java |   2 +-
 .../graph/generator/SingletonEdgeGraph.java     |   6 +-
 .../apache/flink/graph/generator/StarGraph.java |   8 +-
 .../directed/AverageClusteringCoefficient.java  |   5 +
 .../directed/GlobalClusteringCoefficient.java   |   5 +
 .../clustering/directed/TriadicCensus.java      |   5 +
 .../AverageClusteringCoefficient.java           |   5 +
 .../undirected/GlobalClusteringCoefficient.java |   5 +
 .../clustering/undirected/TriadicCensus.java    |   5 +
 .../flink/graph/library/linkanalysis/HITS.java  |  12 ++-
 .../graph/library/linkanalysis/PageRank.java    |  50 ++++++++-
 .../library/metric/directed/EdgeMetrics.java    |   5 +
 .../library/metric/directed/VertexMetrics.java  |   5 +
 .../library/metric/undirected/EdgeMetrics.java  |   5 +
 .../metric/undirected/VertexMetrics.java        |   5 +
 .../graph/library/similarity/AdamicAdar.java    |   4 +-
 .../org/apache/flink/graph/asm/AsmTestBase.java |  25 ++++-
 .../graph/asm/dataset/ChecksumHashCodeTest.java |  19 +++-
 .../flink/graph/asm/dataset/CollectTest.java    |  19 +++-
 .../flink/graph/asm/dataset/CountTest.java      |  18 +++-
 .../annotate/directed/EdgeDegreesPairTest.java  |  25 +++--
 .../directed/EdgeSourceDegreesTest.java         |  25 +++--
 .../directed/EdgeTargetDegreesTest.java         |  25 +++--
 .../annotate/directed/VertexDegreesTest.java    |  40 ++++---
 .../annotate/directed/VertexInDegreeTest.java   |  52 +++++++---
 .../annotate/directed/VertexOutDegreeTest.java  |  52 +++++++---
 .../annotate/undirected/EdgeDegreePairTest.java |  45 ++++++--
 .../undirected/EdgeSourceDegreeTest.java        |  45 ++++++--
 .../undirected/EdgeTargetDegreeTest.java        |  45 ++++++--
 .../annotate/undirected/VertexDegreeTest.java   |  45 +++++---
 .../filter/undirected/MaximumDegreeTest.java    |  28 +++--
 .../graph/asm/simple/directed/SimplifyTest.java |   3 +-
 .../asm/simple/undirected/SimplifyTest.java     |   6 +-
 .../graph/asm/translate/TranslateTest.java      |   9 +-
 .../graph/generator/CirculantGraphTest.java     |  12 +--
 .../graph/generator/CompleteGraphTest.java      |  12 +--
 .../flink/graph/generator/CycleGraphTest.java   |  12 +--
 .../flink/graph/generator/EchoGraphTest.java    |  24 ++---
 .../flink/graph/generator/EmptyGraphTest.java   |  12 +--
 .../flink/graph/generator/GridGraphTest.java    |  12 +--
 .../graph/generator/HypercubeGraphTest.java     |  12 +--
 .../flink/graph/generator/PathGraphTest.java    |  12 +--
 .../flink/graph/generator/RMatGraphTest.java    |   9 +-
 .../graph/generator/SingletonEdgeGraphTest.java |  12 +--
 .../flink/graph/generator/StarGraphTest.java    |  12 +--
 .../apache/flink/graph/generator/TestUtils.java |   9 +-
 .../AverageClusteringCoefficientTest.java       |  74 ++++++-------
 .../GlobalClusteringCoefficientTest.java        |  73 ++++++-------
 .../LocalClusteringCoefficientTest.java         |  60 +++++++----
 .../clustering/directed/TriadicCensusTest.java  |  34 +++---
 .../directed/TriangleListingTest.java           |  28 +++--
 .../AverageClusteringCoefficientTest.java       |  74 ++++++-------
 .../GlobalClusteringCoefficientTest.java        |  73 ++++++-------
 .../LocalClusteringCoefficientTest.java         |  60 +++++++----
 .../undirected/TriadicCensusTest.java           |  28 +++--
 .../undirected/TriangleListingTest.java         |  31 ++++--
 .../graph/library/linkanalysis/HITSTest.java    | 103 +++++++++++--------
 .../library/linkanalysis/PageRankTest.java      |  97 ++++++++++-------
 .../library/metric/ChecksumHashCodeTest.java    |  27 ++++-
 .../metric/directed/EdgeMetricsTest.java        |  70 ++++++-------
 .../metric/directed/VertexMetricsTest.java      |  95 ++++++++---------
 .../metric/undirected/EdgeMetricsTest.java      |  70 ++++++-------
 .../metric/undirected/VertexMetricsTest.java    |  95 ++++++++---------
 .../library/similarity/AdamicAdarTest.java      |  76 +++++++++++---
 .../library/similarity/JaccardIndexTest.java    |  89 ++++++++++++----
 76 files changed, 1320 insertions(+), 829 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/docs/dev/libs/gelly/library_methods.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/library_methods.md 
b/docs/dev/libs/gelly/library_methods.md
index de53aaf..93a2c5d 100644
--- a/docs/dev/libs/gelly/library_methods.md
+++ b/docs/dev/libs/gelly/library_methods.md
@@ -330,6 +330,7 @@ The algorithm takes a simple directed graph as input and 
outputs a `DataSet` of
 hub score, and authority score. Termination is configured by the number of 
iterations and/or a convergence threshold on
 the iteration sum of the change in scores over all vertices.
 
+* `setIncludeZeroDegreeVertices`: whether to include zero-degree vertices in 
the iterative computation
 * `setParallelism`: override the operator parallelism
 
 ### PageRank

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
index 299aeed..b9997e4 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
@@ -20,13 +20,14 @@ package org.apache.flink.graph.drivers;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.BooleanParameter;
 import org.apache.flink.graph.drivers.parameter.DoubleParameter;
 import org.apache.flink.graph.drivers.parameter.IterationConvergence;
 
 import org.apache.commons.lang3.text.StrBuilder;
 
 /**
- * @see org.apache.flink.graph.library.linkanalysis.PageRank
+ * Driver for {@link org.apache.flink.graph.library.linkanalysis.PageRank}.
  */
 public class PageRank<K, VV, EV>
 extends DriverBase<K, VV, EV> {
@@ -40,6 +41,8 @@ extends DriverBase<K, VV, EV> {
 
        private IterationConvergence iterationConvergence = new 
IterationConvergence(this, DEFAULT_ITERATIONS);
 
+       private BooleanParameter includeZeroDegreeVertices = new 
BooleanParameter(this, "__include_zero_degree_vertices");
+
        @Override
        public String getShortDescription() {
                return "score vertices by the number and quality of incoming 
links";
@@ -63,6 +66,7 @@ extends DriverBase<K, VV, EV> {
                                        dampingFactor.getValue(),
                                        
iterationConvergence.getValue().iterations,
                                        
iterationConvergence.getValue().convergenceThreshold)
+                               
.setIncludeZeroDegreeVertices(includeZeroDegreeVertices.getValue())
                                
.setParallelism(parallelism.getValue().intValue()));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
index 1421b1d..fdb144f 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
@@ -76,7 +76,6 @@ extends GeneratedGraph {
 
        @Override
        protected long vertexCount() {
-               // in Java 8 use Math.multiplyExact(long, long)
                BigInteger vertexCount = BigInteger.ONE;
                for (Dimension dimension : dimensions) {
                        vertexCount = 
vertexCount.multiply(BigInteger.valueOf(dimension.size));

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java
index 1dc9e66..0cb3edd 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java
@@ -51,7 +51,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
        private final ExecutionEnvironment env;
 
        // Required configuration
-       private long vertexCount;
+       private final long vertexCount;
 
        private List<OffsetRange> offsetRanges = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
index bf7dedf..46d8e67 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
@@ -36,7 +36,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
        private final ExecutionEnvironment env;
 
        // Required configuration
-       private long vertexCount;
+       private final long vertexCount;
 
        /**
         * An undirected {@link Graph} connecting every distinct pair of 
vertices.

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
index 5b61fa8..5dad4c8 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
@@ -36,7 +36,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
        private final ExecutionEnvironment env;
 
        // Required configuration
-       private long vertexCount;
+       private final long vertexCount;
 
        /**
         * An undirected {@link Graph} with {@code n} vertices where each vertex

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java
index d834df1..4baeb25 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java
@@ -46,9 +46,9 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
        private final ExecutionEnvironment env;
 
        // Required configuration
-       private long vertexCount;
+       private final long vertexCount;
 
-       private long vertexDegree;
+       private final long vertexDegree;
 
        /**
         * An undirected {@link Graph} whose vertices have the same degree.

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
index 466e2d3..30d0d2f 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
@@ -18,12 +18,11 @@
 
 package org.apache.flink.graph.generator;
 
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
@@ -39,13 +38,13 @@ import java.util.Collections;
 public class EmptyGraph
 extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
 
-       public static final int MINIMUM_VERTEX_COUNT = 1;
+       public static final int MINIMUM_VERTEX_COUNT = 0;
 
        // Required to create the DataSource
        private final ExecutionEnvironment env;
 
        // Required configuration
-       private long vertexCount;
+       private final long vertexCount;
 
        /**
         * The {@link Graph} containing no edges.
@@ -63,15 +62,14 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> 
{
 
        @Override
        public Graph<LongValue, NullValue, NullValue> generate() {
+               Preconditions.checkState(vertexCount >= 0);
+
                // Vertices
                DataSet<Vertex<LongValue, NullValue>> vertices = 
GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
 
                // Edges
-               TypeInformation<Edge<LongValue, NullValue>> typeInformation = 
new TupleTypeInfo<>(
-                       ValueTypeInfo.LONG_VALUE_TYPE_INFO, 
ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.NULL_VALUE_TYPE_INFO);
-
                DataSource<Edge<LongValue, NullValue>> edges = env
-                       .fromCollection(Collections.<Edge<LongValue, 
NullValue>>emptyList(), typeInformation)
+                       .fromCollection(Collections.<Edge<LongValue, 
NullValue>>emptyList(), TypeInformation.of(new TypeHint<Edge<LongValue, 
NullValue>>(){}))
                                .setParallelism(parallelism)
                                .name("Empty edge set");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
index d5a70f3..61a27c4 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
@@ -20,6 +20,9 @@ package org.apache.flink.graph.generator;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
+import 
org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
@@ -31,6 +34,9 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.LongValueSequenceIterator;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
 
 /**
  * Utilities for graph generators.
@@ -45,26 +51,34 @@ public class GraphGeneratorUtils {
         * @param env the Flink execution environment.
         * @param parallelism operator parallelism
         * @param vertexCount number of sequential vertex labels
-        * @return {@link DataSet} of sequentially labeled {@link Vertex 
Vertices}
+        * @return {@link DataSet} of sequentially labeled {@link Vertex 
vertices}
         */
        public static DataSet<Vertex<LongValue, NullValue>> 
vertexSequence(ExecutionEnvironment env, int parallelism, long vertexCount) {
-               LongValueSequenceIterator iterator = new 
LongValueSequenceIterator(0, vertexCount - 1);
-
-               DataSource<LongValue> vertexLabels = env
-                       .fromParallelCollection(iterator, LongValue.class)
-                               .setParallelism(parallelism)
-                               .name("Vertex iterators");
-
-               return vertexLabels
-                       .map(new CreateVertex())
-                               .setParallelism(parallelism)
-                               .name("Vertex sequence");
+               Preconditions.checkArgument(vertexCount >= 0, "Vertex count 
must be non-negative");
+
+               if (vertexCount == 0) {
+                       return env
+                               .fromCollection(Collections.emptyList(), 
TypeInformation.of(new TypeHint<Vertex<LongValue, NullValue>>(){}))
+                                       .setParallelism(parallelism)
+                                       .name("Empty vertex set");
+               } else {
+                       LongValueSequenceIterator iterator = new 
LongValueSequenceIterator(0, vertexCount - 1);
+
+                       DataSource<LongValue> vertexLabels = env
+                               .fromParallelCollection(iterator, 
LongValue.class)
+                                       .setParallelism(parallelism)
+                                       .name("Vertex indices");
+
+                       return vertexLabels
+                               .map(new CreateVertex())
+                                       .setParallelism(parallelism)
+                                       .name("Vertex sequence");
+               }
        }
 
        @ForwardedFields("*->f0")
        private static class CreateVertex
        implements MapFunction<LongValue, Vertex<LongValue, NullValue>> {
-
                private Vertex<LongValue, NullValue> vertex = new 
Vertex<>(null, NullValue.getInstance());
 
                @Override
@@ -79,13 +93,13 @@ public class GraphGeneratorUtils {
        // 
--------------------------------------------------------------------------------------------
 
        /**
-        * Generates {@link Vertex Vertices} present in the given set of {@link 
Edge}s.
+        * Generates {@link Vertex vertices} present in the given set of {@link 
Edge}s.
         *
         * @param edges source {@link DataSet} of {@link Edge}s
         * @param parallelism operator parallelism
         * @param <K> label type
         * @param <EV> edge value type
-        * @return {@link DataSet} of discovered {@link Vertex Vertices}
+        * @return {@link DataSet} of discovered {@link Vertex vertices}
         *
         * @see Graph#fromDataSet(DataSet, DataSet, ExecutionEnvironment)
         */
@@ -97,6 +111,7 @@ public class GraphGeneratorUtils {
 
                return vertexSet
                        .distinct()
+                       .setCombineHint(CombineHint.HASH)
                                .setParallelism(parallelism)
                                .name("Emit vertex labels");
        }
@@ -106,7 +121,6 @@ public class GraphGeneratorUtils {
         */
        private static final class EmitSrcAndTarget<K, EV>
        implements FlatMapFunction<Edge<K, EV>, Vertex<K, NullValue>> {
-
                private Vertex<K, NullValue> output = new Vertex<>(null, new 
NullValue());
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
index cae2bc4..1d91b53 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
@@ -71,7 +71,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
        public GridGraph addDimension(long size, boolean wrapEndpoints) {
                Preconditions.checkArgument(size >= 2, "Dimension size must be 
at least 2");
 
-               vertexCount *= size;
+               vertexCount = Math.multiplyExact(vertexCount, size);
 
                // prevent duplicate edges
                if (size == 2) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
index daaaf53..0dc95e7 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
@@ -36,7 +36,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
        private final ExecutionEnvironment env;
 
        // Required configuration
-       private long dimensions;
+       private final long dimensions;
 
        /**
         * An undirected {@code Graph} where edges form an n-dimensional 
hypercube.
@@ -56,6 +56,8 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
 
        @Override
        public Graph<LongValue, NullValue, NullValue> generate() {
+               Preconditions.checkState(dimensions > 0);
+
                GridGraph graph = new GridGraph(env);
 
                for (int i = 0; i < dimensions; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
index e61fcd8..bae8633 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
@@ -36,7 +36,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
        private final ExecutionEnvironment env;
 
        // Required configuration
-       private long vertexCount;
+       private final long vertexCount;
 
        /**
         * An undirected {@link Graph} with {@code n} vertices where each vertex

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
index 159e55d..333c051 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
@@ -42,7 +42,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
        private final ExecutionEnvironment env;
 
        // Required configuration
-       private long vertexPairCount;
+       private final long vertexPairCount;
 
        /**
         * An undirected {@link Graph} containing one or more isolated 
two-paths.
@@ -62,8 +62,10 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
 
        @Override
        public Graph<LongValue, NullValue, NullValue> generate() {
+               Preconditions.checkState(vertexPairCount > 0);
+
                // Vertices
-               long vertexCount = 2 * this.vertexPairCount;
+               long vertexCount = 2 * vertexPairCount;
 
                DataSet<Vertex<LongValue, NullValue>> vertices = 
GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
index 7133320..5cbb256 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
@@ -43,11 +43,11 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> 
{
        private final ExecutionEnvironment env;
 
        // Required configuration
-       private long vertexCount;
+       private final long vertexCount;
 
        /**
-        * An undirected {@Graph} with {@code n} vertices where the single 
central
-        * node has degree {@code n-1}, connecting to the other {@code n-1}
+        * An undirected {@link Graph} with {@code n} vertices where the single
+        * central node has degree {@code n-1}, connecting to the other {@code 
n-1}
         * vertices which have degree {@code 1}.
         *
         * @param env the Flink execution environment
@@ -63,6 +63,8 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
 
        @Override
        public Graph<LongValue, NullValue, NullValue> generate() {
+               Preconditions.checkState(vertexCount >= 2);
+
                // Vertices
                DataSet<Vertex<LongValue, NullValue>> vertices = 
GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
index 2ff5dc2..1d3b7c6 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
@@ -150,6 +150,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
                }
 
                @Override
+               public String toString() {
+                       return toPrintableString();
+               }
+
+               @Override
                public String toPrintableString() {
                        return "vertex count: " + vertexCount
                                + ", average clustering coefficient: " + 
averageLocalClusteringCoefficient;

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
index a6b0baa..ae15d40 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
@@ -137,6 +137,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
                }
 
                @Override
+               public String toString() {
+                       return toPrintableString();
+               }
+
+               @Override
                public String toPrintableString() {
                        return "triplet count: " + tripletCount
                                + ", triangle count: " + triangleCount

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
index 93eadc5..f99df1c 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
@@ -504,6 +504,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
                }
 
                @Override
+               public String toString() {
+                       return toPrintableString();
+               }
+
+               @Override
                public String toPrintableString() {
                        NumberFormat nf = NumberFormat.getInstance();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
index c0ddb05..2885d6e 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
@@ -150,6 +150,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
                }
 
                @Override
+               public String toString() {
+                       return toPrintableString();
+               }
+
+               @Override
                public String toPrintableString() {
                        return "vertex count: " + vertexCount
                                + ", average clustering coefficient: " + 
averageLocalClusteringCoefficient;

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
index ef212ae..1a7314e 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
@@ -136,6 +136,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
                }
 
                @Override
+               public String toString() {
+                       return toPrintableString();
+               }
+
+               @Override
                public String toPrintableString() {
                        return "triplet count: " + tripletCount
                                + ", triangle count: " + triangleCount

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
index f440098..24dcade 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
@@ -198,6 +198,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
                }
 
                @Override
+               public String toString() {
+                       return toPrintableString();
+               }
+
+               @Override
                public String toPrintableString() {
                        NumberFormat nf = NumberFormat.getInstance();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
index e59240b..9ec1eea 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
@@ -48,6 +48,7 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
+import java.util.Iterator;
 
 /**
  * Hyperlink-Induced Topic Search computes two interdependent scores for every
@@ -387,12 +388,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, 
Result<K>> {
                public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
 
-                       Collection<DoubleValue> var;
-                       var = 
getRuntimeContext().getBroadcastVariable(HUBBINESS_SUM_SQUARED);
-                       hubbinessRootSumSquared = 
Math.sqrt(var.iterator().next().getValue());
+                       Collection<DoubleValue> hubbinessSumSquared = 
getRuntimeContext().getBroadcastVariable(HUBBINESS_SUM_SQUARED);
+                       Iterator<DoubleValue> hubbinessSumSquaredIterator = 
hubbinessSumSquared.iterator();
+                       this.hubbinessRootSumSquared = 
hubbinessSumSquaredIterator.hasNext() ? 
Math.sqrt(hubbinessSumSquaredIterator.next().getValue()) : Double.NaN;
 
-                       var = 
getRuntimeContext().getBroadcastVariable(AUTHORITY_SUM_SQUARED);
-                       authorityRootSumSquared = 
Math.sqrt(var.iterator().next().getValue());
+                       Collection<DoubleValue> authoritySumSquared = 
getRuntimeContext().getBroadcastVariable(AUTHORITY_SUM_SQUARED);
+                       Iterator<DoubleValue> authoritySumSquaredIterator = 
authoritySumSquared.iterator();
+                       authorityRootSumSquared = 
authoritySumSquaredIterator.hasNext() ? 
Math.sqrt(authoritySumSquaredIterator.next().getValue()) : Double.NaN;
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
index 71c37aa..d259fac 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
@@ -84,6 +84,9 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
        private double convergenceThreshold;
 
+       // Optional configuration
+       private boolean includeZeroDegreeVertices = false;
+
        /**
         * PageRank with a fixed number of iterations.
         *
@@ -126,6 +129,42 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, 
Result<K>> {
                this.convergenceThreshold = convergenceThreshold;
        }
 
+       /**
+        * This PageRank implementation properly handles both source and sink
+        * vertices which have, respectively, only outgoing and incoming edges.
+        *
+        * <p>Setting this flag includes "zero-degree" vertices in the PageRank
+        * computation and result. These vertices are handled the same as other
+        * "source" vertices (with a consistent score of
+        * <code>(1 - damping factor) / number of vertices</code>) but only
+        * affect the scores of other vertices indirectly through the taking of
+        * this proportional portion of the "random jump" score.
+        *
+        * <p>The cost to include zero-degree vertices is a reduce for 
uniqueness
+        * on the vertex set followed by an outer join on the vertex degree
+        * DataSet.
+        *
+        * @param includeZeroDegreeVertices whether to include zero-degree 
vertices in the iterative computation
+        * @return this
+        */
+       public PageRank<K, VV, EV> setIncludeZeroDegreeVertices(boolean 
includeZeroDegreeVertices) {
+               this.includeZeroDegreeVertices = includeZeroDegreeVertices;
+
+               return this;
+       }
+
+       @Override
+       protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase 
other) {
+               if (!super.canMergeConfigurationWith(other)) {
+                       return false;
+               }
+
+               PageRank rhs = (PageRank) other;
+
+               return dampingFactor == rhs.dampingFactor &&
+                       includeZeroDegreeVertices == 
rhs.includeZeroDegreeVertices;
+       }
+
        @Override
        protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
                super.mergeConfiguration(other);
@@ -142,6 +181,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                // vertex degree
                DataSet<Vertex<K, Degrees>> vertexDegree = input
                        .run(new VertexDegrees<K, VV, EV>()
+                               
.setIncludeZeroDegreeVertices(includeZeroDegreeVertices)
                                .setParallelism(parallelism));
 
                // vertex count
@@ -158,7 +198,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                // vertices with zero in-edges
                DataSet<Tuple2<K, DoubleValue>> sourceVertices = vertexDegree
                        .flatMap(new InitializeSourceVertices<>())
-                       .withBroadcastSet(vertexCount, VERTEX_COUNT)
                                .setParallelism(parallelism)
                                .name("Initialize source vertex scores");
 
@@ -285,7 +324,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                        super.open(parameters);
 
                        Collection<LongValue> vertexCount = 
getRuntimeContext().getBroadcastVariable(VERTEX_COUNT);
-                       output.f1 = new DoubleValue(1.0 / 
vertexCount.iterator().next().getValue());
+                       Iterator<LongValue> vertexCountIterator = 
vertexCount.iterator();
+                       output.f1 = new 
DoubleValue(vertexCountIterator.hasNext() ? 1.0 / 
vertexCountIterator.next().getValue() : Double.NaN);
                }
 
                @Override
@@ -376,11 +416,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, 
Result<K>> {
                        super.open(parameters);
 
                        Collection<Tuple2<T, DoubleValue>> sumOfScores = 
getRuntimeContext().getBroadcastVariable(SUM_OF_SCORES);
+                       Iterator<Tuple2<T, DoubleValue>> sumOfScoresIterator = 
sumOfScores.iterator();
                        // floating point precision error is also included in 
sumOfSinks
-                       double sumOfSinks = 1 - 
sumOfScores.iterator().next().f1.getValue();
+                       double sumOfSinks = 1 - (sumOfScoresIterator.hasNext() 
? sumOfScoresIterator.next().f1.getValue() : 0);
 
                        Collection<LongValue> vertexCount = 
getRuntimeContext().getBroadcastVariable(VERTEX_COUNT);
-                       this.vertexCount = 
vertexCount.iterator().next().getValue();
+                       Iterator<LongValue> vertexCountIterator = 
vertexCount.iterator();
+                       this.vertexCount = vertexCountIterator.hasNext() ? 
vertexCountIterator.next().getValue() : 0;
 
                        this.uniformlyDistributedScore = ((1 - dampingFactor) + 
dampingFactor * sumOfSinks) / this.vertexCount;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
index 7294fd1..796667f 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
@@ -310,6 +310,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
                }
 
                @Override
+               public String toString() {
+                       return toPrintableString();
+               }
+
+               @Override
                public String toPrintableString() {
                        NumberFormat nf = NumberFormat.getInstance();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
index 97ee6fa..87c50ce 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
@@ -307,6 +307,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
                }
 
                @Override
+               public String toString() {
+                       return toPrintableString();
+               }
+
+               @Override
                public String toPrintableString() {
                        NumberFormat nf = NumberFormat.getInstance();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
index 8c520e6..3392d47 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
@@ -283,6 +283,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
                }
 
                @Override
+               public String toString() {
+                       return toPrintableString();
+               }
+
+               @Override
                public String toPrintableString() {
                        NumberFormat nf = NumberFormat.getInstance();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
index 1116149..39fec4a 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
@@ -257,6 +257,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
                }
 
                @Override
+               public String toString() {
+                       return toPrintableString();
+               }
+
+               @Override
                public String toPrintableString() {
                        NumberFormat nf = NumberFormat.getInstance();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
index 752e206..701e698 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
@@ -410,7 +410,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                @Override
                public void reduce(Iterable<Tuple3<T, T, FloatValue>> values, 
Collector<Result<T>> out)
                                throws Exception {
-                       float sum = 0;
+                       double sum = 0;
                        Tuple3<T, T, FloatValue> edge = null;
 
                        for (Tuple3<T, T, FloatValue> next : values) {
@@ -421,7 +421,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                        if (sum >= minimumScore) {
                                output.setVertexId0(edge.f0);
                                output.setVertexId1(edge.f1);
-                               output.setAdamicAdarScore(sum);
+                               output.setAdamicAdarScore((float) sum);
                                out.collect(output);
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
index 1afb5da..1b6acbc 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
@@ -24,6 +24,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.generator.CompleteGraph;
 import org.apache.flink.graph.generator.EmptyGraph;
 import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.StarGraph;
 import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
@@ -56,11 +57,17 @@ public class AsmTestBase {
        // empty graph
        protected final long emptyGraphVertexCount = 3;
 
-       protected Graph<LongValue, NullValue, NullValue> emptyGraph;
+       protected Graph<LongValue, NullValue, NullValue> emptyGraphWithVertices;
+
+       protected Graph<LongValue, NullValue, NullValue> 
emptyGraphWithoutVertices;
+
+       // star graph
+       protected final long starGraphVertexCount = 29;
+
+       protected Graph<LongValue, NullValue, NullValue> starGraph;
 
        @Before
-       public void setup()
-                       throws Exception {
+       public void setup() throws Exception {
                env = ExecutionEnvironment.createCollectionsEnvironment();
                env.getConfig().enableObjectReuse();
 
@@ -89,8 +96,16 @@ public class AsmTestBase {
                completeGraph = new CompleteGraph(env, completeGraphVertexCount)
                        .generate();
 
-               // empty graph
-               emptyGraph = new EmptyGraph(env, emptyGraphVertexCount)
+               // empty graph with vertices but no edges
+               emptyGraphWithVertices = new EmptyGraph(env, 
emptyGraphVertexCount)
+                       .generate();
+
+               // empty graph with no vertices or edges
+               emptyGraphWithoutVertices = new EmptyGraph(env, 0)
+                       .generate();
+
+               // star graph
+               starGraph = new StarGraph(env, starGraphVertexCount)
                        .generate();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
index 7d82b80..a31ce2e 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.graph.asm.dataset;
 
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
@@ -27,6 +29,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -39,15 +42,13 @@ public class ChecksumHashCodeTest {
        private ExecutionEnvironment env;
 
        @Before
-       public void setup()
-                       throws Exception {
+       public void setup() throws Exception {
                env = ExecutionEnvironment.createCollectionsEnvironment();
                env.getConfig().enableObjectReuse();
        }
 
        @Test
-       public void testChecksumHashCode()
-                       throws Exception {
+       public void testList() throws Exception {
                List<Long> list = Arrays.asList(ArrayUtils.toObject(
                        new long[]{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }));
 
@@ -58,4 +59,14 @@ public class ChecksumHashCodeTest {
                assertEquals(list.size(), checksum.getCount());
                assertEquals(list.size() * (list.size() - 1) / 2, 
checksum.getChecksum());
        }
+
+       @Test
+       public void testEmptyList() throws Exception {
+               DataSet<Long> dataset = 
env.fromCollection(Collections.emptyList(), TypeInformation.of(new 
TypeHint<Long>(){}));
+
+               Checksum checksum = new 
ChecksumHashCode<Long>().run(dataset).execute();
+
+               assertEquals(0, checksum.getCount());
+               assertEquals(0, checksum.getChecksum());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
index 29b454b..cfeadce 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.graph.asm.dataset;
 
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
@@ -26,9 +28,11 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 
 /**
  * Tests for {@link Collect}.
@@ -38,15 +42,13 @@ public class CollectTest {
        private ExecutionEnvironment env;
 
        @Before
-       public void setup()
-                       throws Exception {
+       public void setup() throws Exception {
                env = ExecutionEnvironment.createCollectionsEnvironment();
                env.getConfig().enableObjectReuse();
        }
 
        @Test
-       public void testCollect()
-                       throws Exception {
+       public void testList() throws Exception {
                List<Long> list = Arrays.asList(ArrayUtils.toObject(
                        new long[]{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }));
 
@@ -56,4 +58,13 @@ public class CollectTest {
 
                assertArrayEquals(list.toArray(), collected.toArray());
        }
+
+       @Test
+       public void testEmptyList() throws Exception {
+               DataSet<Long> dataset = 
env.fromCollection(Collections.emptyList(), TypeInformation.of(new 
TypeHint<Long>(){}));
+
+               List<Long> collected = new 
Collect<Long>().run(dataset).execute();
+
+               assertEquals(0, collected.size());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
index a1160ce..0167a5f 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.graph.asm.dataset;
 
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
@@ -26,6 +28,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -38,15 +41,13 @@ public class CountTest {
        private ExecutionEnvironment env;
 
        @Before
-       public void setup()
-                       throws Exception {
+       public void setup() throws Exception {
                env = ExecutionEnvironment.createCollectionsEnvironment();
                env.getConfig().enableObjectReuse();
        }
 
        @Test
-       public void testCount()
-                       throws Exception {
+       public void testList() throws Exception {
                List<Long> list = Arrays.asList(ArrayUtils.toObject(
                        new long[]{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }));
 
@@ -56,4 +57,13 @@ public class CountTest {
 
                assertEquals(list.size(), count);
        }
+
+       @Test
+       public void testEmptyList() throws Exception {
+               DataSet<Long> dataset = 
env.fromCollection(Collections.emptyList(), TypeInformation.of(new 
TypeHint<Long>(){}));
+
+               long count = new Count<Long>().run(dataset).execute();
+
+               assertEquals(0, count);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
index 63bf133..08ba4aa 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
@@ -37,12 +37,10 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for {@link EdgeDegreesPair}.
  */
-public class EdgeDegreesPairTest
-extends AsmTestBase {
+public class EdgeDegreesPairTest extends AsmTestBase {
 
        @Test
-       public void testWithSimpleGraph()
-                       throws Exception {
+       public void testWithSimpleGraph() throws Exception {
                String expectedResult =
                        "(0,1,((null),(2,2,0),(3,0,3)))\n" +
                        "(0,2,((null),(2,2,0),(3,2,1)))\n" +
@@ -59,8 +57,23 @@ extends AsmTestBase {
        }
 
        @Test
-       public void testWithRMatGraph()
-                       throws Exception {
+       public void testWithEmptyGraphWithVertices() throws Exception {
+               DataSet<Edge<LongValue, Tuple3<NullValue, Degrees, Degrees>>> 
degreesPair = emptyGraphWithVertices
+                       .run(new EdgeDegreesPair<>());
+
+               assertEquals(0, degreesPair.collect().size());
+       }
+
+       @Test
+       public void testWithEmptyGraphWithoutVertices() throws Exception {
+               DataSet<Edge<LongValue, Tuple3<NullValue, Degrees, Degrees>>> 
degreesPair = emptyGraphWithoutVertices
+                       .run(new EdgeDegreesPair<>());
+
+               assertEquals(0, degreesPair.collect().size());
+       }
+
+       @Test
+       public void testWithRMatGraph() throws Exception {
                DataSet<Edge<LongValue, Tuple3<NullValue, Degrees, Degrees>>> 
degreesPair = directedRMatGraph(10, 16)
                        .run(new EdgeDegreesPair<>());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
index 967cfb2..884e2d5 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
@@ -37,12 +37,10 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for {@link EdgeSourceDegrees}.
  */
-public class EdgeSourceDegreesTest
-extends AsmTestBase {
+public class EdgeSourceDegreesTest extends AsmTestBase {
 
        @Test
-       public void testWithSimpleGraph()
-                       throws Exception {
+       public void testWithSimpleGraph() throws Exception {
                String expectedResult =
                        "(0,1,((null),(2,2,0)))\n" +
                        "(0,2,((null),(2,2,0)))\n" +
@@ -59,8 +57,23 @@ extends AsmTestBase {
        }
 
        @Test
-       public void testWithRMatGraph()
-                       throws Exception {
+       public void testWithEmptyGraphWithVertices() throws Exception {
+               DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> 
sourceDegrees = emptyGraphWithVertices
+                       .run(new EdgeSourceDegrees<>());
+
+               assertEquals(0, sourceDegrees.collect().size());
+       }
+
+       @Test
+       public void testWithEmptyGraphWithoutVertices() throws Exception {
+               DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> 
sourceDegrees = emptyGraphWithoutVertices
+                       .run(new EdgeSourceDegrees<>());
+
+               assertEquals(0, sourceDegrees.collect().size());
+       }
+
+       @Test
+       public void testWithRMatGraph() throws Exception {
                DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> 
sourceDegrees = directedRMatGraph(10, 16)
                        .run(new EdgeSourceDegrees<>());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
index abb76c4..5afd64c 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
@@ -37,12 +37,10 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for {@link EdgeTargetDegrees}.
  */
-public class EdgeTargetDegreesTest
-extends AsmTestBase {
+public class EdgeTargetDegreesTest extends AsmTestBase {
 
        @Test
-       public void testWithSimpleGraph()
-                       throws Exception {
+       public void testWithSimpleGraph() throws Exception {
                String expectedResult =
                        "(0,1,((null),(3,0,3)))\n" +
                        "(0,2,((null),(3,2,1)))\n" +
@@ -59,8 +57,23 @@ extends AsmTestBase {
        }
 
        @Test
-       public void testWithRMatGraph()
-                       throws Exception {
+       public void testWithEmptyGraphWithVertices() throws Exception {
+               DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> 
targetDegrees = emptyGraphWithVertices
+                       .run(new EdgeTargetDegrees<>());
+
+               assertEquals(0, targetDegrees.collect().size());
+       }
+
+       @Test
+       public void testWithEmptyGraphWithoutVertices() throws Exception {
+               DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> 
targetDegrees = emptyGraphWithoutVertices
+                       .run(new EdgeTargetDegrees<>());
+
+               assertEquals(0, targetDegrees.collect().size());
+       }
+
+       @Test
+       public void testWithRMatGraph() throws Exception {
                DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> 
targetDegrees = directedRMatGraph(10, 16)
                        .run(new EdgeTargetDegrees<>());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
index 91f354f..7b89fe7 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
@@ -36,12 +36,10 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for {@link VertexDegrees}.
  */
-public class VertexDegreesTest
-extends AsmTestBase {
+public class VertexDegreesTest extends AsmTestBase {
 
        @Test
-       public void testWithSimpleDirectedGraph()
-                       throws Exception {
+       public void testWithDirectedSimpleGraph() throws Exception {
                DataSet<Vertex<IntValue, Degrees>> degrees = directedSimpleGraph
                        .run(new VertexDegrees<>());
 
@@ -57,8 +55,7 @@ extends AsmTestBase {
        }
 
        @Test
-       public void testWithSimpleUndirectedGraph()
-                       throws Exception {
+       public void testWithUndirectedSimpleGraph() throws Exception {
                DataSet<Vertex<IntValue, Degrees>> degrees = 
undirectedSimpleGraph
                        .run(new VertexDegrees<>());
 
@@ -74,17 +71,14 @@ extends AsmTestBase {
        }
 
        @Test
-       public void testWithEmptyGraph()
-                       throws Exception {
-               DataSet<Vertex<LongValue, Degrees>> degrees;
-
-               degrees = emptyGraph
+       public void testWithEmptyGraphWithVertices() throws Exception {
+               DataSet<Vertex<LongValue, Degrees>> 
degreesWithoutZeroDegreeVertices = emptyGraphWithVertices
                        .run(new VertexDegrees<LongValue, NullValue, 
NullValue>()
                                .setIncludeZeroDegreeVertices(false));
 
-               assertEquals(0, degrees.collect().size());
+               assertEquals(0, 
degreesWithoutZeroDegreeVertices.collect().size());
 
-               degrees = emptyGraph
+               DataSet<Vertex<LongValue, Degrees>> 
degreesWithZeroDegreeVertices = emptyGraphWithVertices
                        .run(new VertexDegrees<LongValue, NullValue, 
NullValue>()
                                .setIncludeZeroDegreeVertices(true));
 
@@ -93,12 +87,26 @@ extends AsmTestBase {
                        "(1,(0,0,0))\n" +
                        "(2,(0,0,0))";
 
-               TestBaseUtils.compareResultAsText(degrees.collect(), 
expectedResult);
+               
TestBaseUtils.compareResultAsText(degreesWithZeroDegreeVertices.collect(), 
expectedResult);
+       }
+
+       @Test
+       public void testWithEmptyGraphWithoutVertices() throws Exception {
+               DataSet<Vertex<LongValue, Degrees>> 
degreesWithoutZeroDegreeVertices = emptyGraphWithoutVertices
+                       .run(new VertexDegrees<LongValue, NullValue, 
NullValue>()
+                               .setIncludeZeroDegreeVertices(false));
+
+               assertEquals(0, 
degreesWithoutZeroDegreeVertices.collect().size());
+
+               DataSet<Vertex<LongValue, Degrees>> 
degreesWithZeroDegreeVertices = emptyGraphWithoutVertices
+                       .run(new VertexDegrees<LongValue, NullValue, 
NullValue>()
+                               .setIncludeZeroDegreeVertices(true));
+
+               assertEquals(0, degreesWithZeroDegreeVertices.collect().size());
        }
 
        @Test
-       public void testWithRMatGraph()
-       throws Exception {
+       public void testWithRMatGraph() throws Exception {
                DataSet<Vertex<LongValue, Degrees>> degrees = 
directedRMatGraph(10, 16)
                        .run(new VertexDegrees<>());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
index f671cab..7f6c9e3 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
@@ -35,12 +35,10 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for {@link VertexInDegree}.
  */
-public class VertexInDegreeTest
-extends AsmTestBase {
+public class VertexInDegreeTest extends AsmTestBase {
 
        @Test
-       public void testWithSimpleGraph()
-                       throws Exception {
+       public void testWithDirectedSimpleGraph() throws Exception {
                DataSet<Vertex<IntValue, LongValue>> inDegree = 
directedSimpleGraph
                        .run(new VertexInDegree<IntValue, NullValue, 
NullValue>()
                                .setIncludeZeroDegreeVertices(true));
@@ -57,17 +55,31 @@ extends AsmTestBase {
        }
 
        @Test
-       public void testWithEmptyGraph()
-                       throws Exception {
-               DataSet<Vertex<LongValue, LongValue>> inDegree;
+       public void testWithUndirectedSimpleGraph() throws Exception {
+               DataSet<Vertex<IntValue, LongValue>> inDegree = 
undirectedSimpleGraph
+                       .run(new VertexInDegree<IntValue, NullValue, 
NullValue>()
+                               .setIncludeZeroDegreeVertices(true));
+
+               String expectedResult =
+                       "(0,2)\n" +
+                       "(1,3)\n" +
+                       "(2,3)\n" +
+                       "(3,4)\n" +
+                       "(4,1)\n" +
+                       "(5,1)";
 
-               inDegree = emptyGraph
+               TestBaseUtils.compareResultAsText(inDegree.collect(), 
expectedResult);
+       }
+
+       @Test
+       public void testWithEmptyGraphWithVertices() throws Exception {
+               DataSet<Vertex<LongValue, LongValue>> 
inDegreeWithoutZeroDegreeVertices = emptyGraphWithVertices
                        .run(new VertexInDegree<LongValue, NullValue, 
NullValue>()
                                .setIncludeZeroDegreeVertices(false));
 
-               assertEquals(0, inDegree.collect().size());
+               assertEquals(0, 
inDegreeWithoutZeroDegreeVertices.collect().size());
 
-               inDegree = emptyGraph
+               DataSet<Vertex<LongValue, LongValue>> 
inDegreeWithZeroDegreeVertices = emptyGraphWithVertices
                        .run(new VertexInDegree<LongValue, NullValue, 
NullValue>()
                                .setIncludeZeroDegreeVertices(true));
 
@@ -76,12 +88,26 @@ extends AsmTestBase {
                        "(1,0)\n" +
                        "(2,0)";
 
-               TestBaseUtils.compareResultAsText(inDegree.collect(), 
expectedResult);
+               
TestBaseUtils.compareResultAsText(inDegreeWithZeroDegreeVertices.collect(), 
expectedResult);
+       }
+
+       @Test
+       public void testWithEmptyGraphWithoutVertices() throws Exception {
+               DataSet<Vertex<LongValue, LongValue>> 
inDegreeWithoutZeroDegreeVertices = emptyGraphWithoutVertices
+                       .run(new VertexInDegree<LongValue, NullValue, 
NullValue>()
+                               .setIncludeZeroDegreeVertices(false));
+
+               assertEquals(0, 
inDegreeWithoutZeroDegreeVertices.collect().size());
+
+               DataSet<Vertex<LongValue, LongValue>> 
inDegreeWithZeroDegreeVertices = emptyGraphWithoutVertices
+                       .run(new VertexInDegree<LongValue, NullValue, 
NullValue>()
+                               .setIncludeZeroDegreeVertices(true));
+
+               assertEquals(0, 
inDegreeWithZeroDegreeVertices.collect().size());
        }
 
        @Test
-       public void testWithRMatGraph()
-                       throws Exception {
+       public void testWithRMatGraph() throws Exception {
                DataSet<Vertex<LongValue, LongValue>> inDegree = 
directedRMatGraph(10, 16)
                        .run(new VertexInDegree<LongValue, NullValue, 
NullValue>()
                                .setIncludeZeroDegreeVertices(true));

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
index 1517f23..7031d8f 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
@@ -35,12 +35,10 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for {@link VertexOutDegree}.
  */
-public class VertexOutDegreeTest
-extends AsmTestBase {
+public class VertexOutDegreeTest extends AsmTestBase {
 
        @Test
-       public void testWithSimpleGraph()
-                       throws Exception {
+       public void testWithDirectedSimpleGraph() throws Exception {
                DataSet<Vertex<IntValue, LongValue>> outDegree = 
directedSimpleGraph
                        .run(new VertexOutDegree<IntValue, NullValue, 
NullValue>()
                                .setIncludeZeroDegreeVertices(true));
@@ -57,17 +55,31 @@ extends AsmTestBase {
        }
 
        @Test
-       public void testWithEmptyGraph()
-                       throws Exception {
-               DataSet<Vertex<LongValue, LongValue>> outDegree;
+       public void testWithUndirectedSimpleGraph() throws Exception {
+               DataSet<Vertex<IntValue, LongValue>> outDegree = 
undirectedSimpleGraph
+                       .run(new VertexOutDegree<IntValue, NullValue, 
NullValue>()
+                               .setIncludeZeroDegreeVertices(true));
+
+               String expectedResult =
+                       "(0,2)\n" +
+                       "(1,3)\n" +
+                       "(2,3)\n" +
+                       "(3,4)\n" +
+                       "(4,1)\n" +
+                       "(5,1)";
 
-               outDegree = emptyGraph
+               TestBaseUtils.compareResultAsText(outDegree.collect(), 
expectedResult);
+       }
+
+       @Test
+       public void testWithEmptyGraphWithVertices() throws Exception {
+               DataSet<Vertex<LongValue, LongValue>> 
outDegreeWithoutZeroDegreeVertices = emptyGraphWithVertices
                        .run(new VertexOutDegree<LongValue, NullValue, 
NullValue>()
                                .setIncludeZeroDegreeVertices(false));
 
-               assertEquals(0, outDegree.collect().size());
+               assertEquals(0, 
outDegreeWithoutZeroDegreeVertices.collect().size());
 
-               outDegree = emptyGraph
+               DataSet<Vertex<LongValue, LongValue>> 
outDegreeWithZeroDegreeVertices = emptyGraphWithVertices
                        .run(new VertexOutDegree<LongValue, NullValue, 
NullValue>()
                                .setIncludeZeroDegreeVertices(true));
 
@@ -76,12 +88,26 @@ extends AsmTestBase {
                        "(1,0)\n" +
                        "(2,0)";
 
-               TestBaseUtils.compareResultAsText(outDegree.collect(), 
expectedResult);
+               
TestBaseUtils.compareResultAsText(outDegreeWithZeroDegreeVertices.collect(), 
expectedResult);
+       }
+
+       @Test
+       public void testWithEmptyGraphWithoutVertices() throws Exception {
+               DataSet<Vertex<LongValue, LongValue>> 
outDegreeWithoutZeroDegreeVertices = emptyGraphWithoutVertices
+                       .run(new VertexOutDegree<LongValue, NullValue, 
NullValue>()
+                               .setIncludeZeroDegreeVertices(false));
+
+               assertEquals(0, 
outDegreeWithoutZeroDegreeVertices.collect().size());
+
+               DataSet<Vertex<LongValue, LongValue>> 
outDegreeWithZeroDegreeVertices = emptyGraphWithoutVertices
+                       .run(new VertexOutDegree<LongValue, NullValue, 
NullValue>()
+                               .setIncludeZeroDegreeVertices(true));
+
+               assertEquals(0, 
outDegreeWithZeroDegreeVertices.collect().size());
        }
 
        @Test
-       public void testWithRMatGraph()
-                       throws Exception {
+       public void testWithRMatGraph() throws Exception {
                DataSet<Vertex<LongValue, LongValue>> outDegree = 
directedRMatGraph(10, 16)
                        .run(new VertexOutDegree<LongValue, NullValue, 
NullValue>()
                                .setIncludeZeroDegreeVertices(true));

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
index 1cae2e7..95a89c5 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
@@ -36,12 +36,10 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for {@link EdgeDegreePair}.
  */
-public class EdgeDegreePairTest
-extends AsmTestBase {
+public class EdgeDegreePairTest extends AsmTestBase {
 
        @Test
-       public void testWithSimpleGraph()
-                       throws Exception {
+       public void testWithSimpleGraph() throws Exception {
                String expectedResult =
                        "(0,1,((null),2,3))\n" +
                        "(0,2,((null),2,3))\n" +
@@ -59,7 +57,8 @@ extends AsmTestBase {
                        "(5,3,((null),1,4))";
 
                DataSet<Edge<IntValue, Tuple3<NullValue, LongValue, 
LongValue>>> degreePairOnSourceId = undirectedSimpleGraph
-                       .run(new EdgeDegreePair<>());
+                       .run(new EdgeDegreePair<IntValue, NullValue, 
NullValue>()
+                               .setReduceOnTargetId(false));
 
                
TestBaseUtils.compareResultAsText(degreePairOnSourceId.collect(), 
expectedResult);
 
@@ -71,10 +70,40 @@ extends AsmTestBase {
        }
 
        @Test
-       public void testWithRMatGraph()
-                       throws Exception {
+       public void testWithEmptyGraphWithVertices() throws Exception {
+               DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, 
LongValue>>> degreePairOnSourceId = emptyGraphWithVertices
+                       .run(new EdgeDegreePair<LongValue, NullValue, 
NullValue>()
+                               .setReduceOnTargetId(false));
+
+               assertEquals(0, degreePairOnSourceId.collect().size());
+
+               DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, 
LongValue>>> degreePairOnTargetId = emptyGraphWithVertices
+                       .run(new EdgeDegreePair<LongValue, NullValue, 
NullValue>()
+                               .setReduceOnTargetId(true));
+
+               assertEquals(0, degreePairOnTargetId.collect().size());
+       }
+
+       @Test
+       public void testWithEmptyGraphWithoutVertices() throws Exception {
+               DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, 
LongValue>>> degreePairOnSourceId = emptyGraphWithoutVertices
+                       .run(new EdgeDegreePair<LongValue, NullValue, 
NullValue>()
+                               .setReduceOnTargetId(false));
+
+               assertEquals(0, degreePairOnSourceId.collect().size());
+
+               DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, 
LongValue>>> degreePairOnTargetId = emptyGraphWithoutVertices
+                       .run(new EdgeDegreePair<LongValue, NullValue, 
NullValue>()
+                               .setReduceOnTargetId(true));
+
+               assertEquals(0, degreePairOnTargetId.collect().size());
+       }
+
+       @Test
+       public void testWithRMatGraph() throws Exception {
                DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, 
LongValue>>> degreePairOnSourceId = undirectedRMatGraph(10, 16)
-                       .run(new EdgeDegreePair<>());
+                       .run(new EdgeDegreePair<LongValue, NullValue, 
NullValue>()
+                               .setReduceOnTargetId(false));
 
                Checksum checksumOnSourceId = new 
ChecksumHashCode<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>>()
                        .run(degreePairOnSourceId)

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
index 2d8b2e3..3802b4f 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
@@ -36,12 +36,10 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for {@link EdgeSourceDegree}.
  */
-public class EdgeSourceDegreeTest
-extends AsmTestBase {
+public class EdgeSourceDegreeTest extends AsmTestBase {
 
        @Test
-       public void testWithSimpleGraph()
-                       throws Exception {
+       public void testWithSimpleGraph() throws Exception {
                String expectedResult =
                        "(0,1,((null),2))\n" +
                        "(0,2,((null),2))\n" +
@@ -59,7 +57,8 @@ extends AsmTestBase {
                        "(5,3,((null),1))";
 
                DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> 
sourceDegreeOnSourceId = undirectedSimpleGraph
-                       .run(new EdgeSourceDegree<>());
+                       .run(new EdgeSourceDegree<IntValue, NullValue, 
NullValue>()
+                               .setReduceOnTargetId(false));
 
                
TestBaseUtils.compareResultAsText(sourceDegreeOnSourceId.collect(), 
expectedResult);
 
@@ -71,10 +70,40 @@ extends AsmTestBase {
        }
 
        @Test
-       public void testWithRMatGraph()
-                       throws Exception {
+       public void testWithEmptyGraphWithVertices() throws Exception {
+               DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> 
sourceDegreeOnSourceId = emptyGraphWithVertices
+                       .run(new EdgeSourceDegree<LongValue, NullValue, 
NullValue>()
+                               .setReduceOnTargetId(false));
+
+               assertEquals(0, sourceDegreeOnSourceId.collect().size());
+
+               DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> 
sourceDegreeOnTargetId = emptyGraphWithVertices
+                       .run(new EdgeSourceDegree<LongValue, NullValue, 
NullValue>()
+                               .setReduceOnTargetId(true));
+
+               assertEquals(0, sourceDegreeOnTargetId.collect().size());
+       }
+
+       @Test
+       public void testWithEmptyGraphWithoutVertices() throws Exception {
+               DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> 
sourceDegreeOnSourceId = emptyGraphWithoutVertices
+                       .run(new EdgeSourceDegree<LongValue, NullValue, 
NullValue>()
+                               .setReduceOnTargetId(false));
+
+               assertEquals(0, sourceDegreeOnSourceId.collect().size());
+
+               DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> 
sourceDegreeOnTargetId = emptyGraphWithoutVertices
+                       .run(new EdgeSourceDegree<LongValue, NullValue, 
NullValue>()
+                               .setReduceOnTargetId(true));
+
+               assertEquals(0, sourceDegreeOnTargetId.collect().size());
+       }
+
+       @Test
+       public void testWithRMatGraph() throws Exception {
                DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> 
sourceDegreeOnSourceId = undirectedRMatGraph(10, 16)
-                       .run(new EdgeSourceDegree<>());
+                       .run(new EdgeSourceDegree<LongValue, NullValue, 
NullValue>()
+                               .setReduceOnTargetId(false));
 
                Checksum checksumOnSourceId = new 
ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, LongValue>>>()
                        .run(sourceDegreeOnSourceId)

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
index a7c88a1..51f880b 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
@@ -36,12 +36,10 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for {@link EdgeTargetDegree}.
  */
-public class EdgeTargetDegreeTest
-extends AsmTestBase {
+public class EdgeTargetDegreeTest extends AsmTestBase {
 
        @Test
-       public void testWithSimpleGraph()
-                       throws Exception {
+       public void testWithSimpleGraph() throws Exception {
                String expectedResult =
                        "(0,1,((null),3))\n" +
                        "(0,2,((null),3))\n" +
@@ -59,7 +57,8 @@ extends AsmTestBase {
                        "(5,3,((null),4))";
 
                DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> 
targetDegreeOnTargetId = undirectedSimpleGraph
-                               .run(new EdgeTargetDegree<>());
+                       .run(new EdgeTargetDegree<IntValue, NullValue, 
NullValue>()
+                               .setReduceOnSourceId(false));
 
                
TestBaseUtils.compareResultAsText(targetDegreeOnTargetId.collect(), 
expectedResult);
 
@@ -71,10 +70,40 @@ extends AsmTestBase {
        }
 
        @Test
-       public void testWithRMatGraph()
-                       throws Exception {
+       public void testWithEmptyGraphWithVertices() throws Exception {
+               DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> 
targetDegreeOnTargetId = emptyGraphWithVertices
+                       .run(new EdgeTargetDegree<LongValue, NullValue, 
NullValue>()
+                               .setReduceOnSourceId(false));
+
+               assertEquals(0, targetDegreeOnTargetId.collect().size());
+
+               DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> 
targetDegreeOnSourceId = emptyGraphWithVertices
+                       .run(new EdgeTargetDegree<LongValue, NullValue, 
NullValue>()
+                               .setReduceOnSourceId(true));
+
+               assertEquals(0, targetDegreeOnSourceId.collect().size());
+       }
+
+       @Test
+       public void testWithEmptyGraphWithoutVertices() throws Exception {
+               DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> 
targetDegreeOnTargetId = emptyGraphWithoutVertices
+                       .run(new EdgeTargetDegree<LongValue, NullValue, 
NullValue>()
+                               .setReduceOnSourceId(false));
+
+               assertEquals(0, targetDegreeOnTargetId.collect().size());
+
+               DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> 
targetDegreeOnSourceId = emptyGraphWithoutVertices
+                       .run(new EdgeTargetDegree<LongValue, NullValue, 
NullValue>()
+                               .setReduceOnSourceId(true));
+
+               assertEquals(0, targetDegreeOnSourceId.collect().size());
+       }
+
+       @Test
+       public void testWithRMatGraph() throws Exception {
                DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> 
targetDegreeOnTargetId = undirectedRMatGraph(10, 16)
-                       .run(new EdgeSourceDegree<>());
+                       .run(new EdgeTargetDegree<LongValue, NullValue, 
NullValue>()
+                               .setReduceOnSourceId(false));
 
                Checksum checksumOnTargetId = new 
ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, LongValue>>>()
                        .run(targetDegreeOnTargetId)

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
index bc76bff..49f0007 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
@@ -35,12 +35,10 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for {@link VertexDegree}.
  */
-public class VertexDegreeTest
-extends AsmTestBase {
+public class VertexDegreeTest extends AsmTestBase {
 
        @Test
-       public void testWithSimpleGraph()
-                       throws Exception {
+       public void testWithSimpleGraph() throws Exception {
                String expectedResult =
                        "(0,2)\n" +
                        "(1,3)\n" +
@@ -50,7 +48,8 @@ extends AsmTestBase {
                        "(5,1)";
 
                DataSet<Vertex<IntValue, LongValue>> degreeOnSourceId = 
undirectedSimpleGraph
-                       .run(new VertexDegree<>());
+                       .run(new VertexDegree<IntValue, NullValue, NullValue>()
+                               .setReduceOnTargetId(false));
 
                TestBaseUtils.compareResultAsText(degreeOnSourceId.collect(), 
expectedResult);
 
@@ -62,12 +61,12 @@ extends AsmTestBase {
        }
 
        @Test
-       public void testWithCompleteGraph()
-                       throws Exception {
+       public void testWithCompleteGraph() throws Exception {
                long expectedDegree = completeGraphVertexCount - 1;
 
                DataSet<Vertex<LongValue, LongValue>> degreeOnSourceId = 
completeGraph
-                       .run(new VertexDegree<>());
+                       .run(new VertexDegree<LongValue, NullValue, NullValue>()
+                               .setReduceOnTargetId(false));
 
                for (Vertex<LongValue, LongValue> vertex : 
degreeOnSourceId.collect()) {
                        assertEquals(expectedDegree, 
vertex.getValue().getValue());
@@ -83,17 +82,16 @@ extends AsmTestBase {
        }
 
        @Test
-       public void testWithEmptyGraph()
-                       throws Exception {
+       public void testWithEmptyGraphWithVertices() throws Exception {
                DataSet<Vertex<LongValue, LongValue>> degree;
 
-               degree = emptyGraph
+               degree = emptyGraphWithVertices
                        .run(new VertexDegree<LongValue, NullValue, NullValue>()
                                .setIncludeZeroDegreeVertices(false));
 
                assertEquals(0, degree.collect().size());
 
-               degree = emptyGraph
+               degree = emptyGraphWithVertices
                        .run(new VertexDegree<LongValue, NullValue, NullValue>()
                                .setIncludeZeroDegreeVertices(true));
 
@@ -106,10 +104,27 @@ extends AsmTestBase {
        }
 
        @Test
-       public void testWithRMatGraph()
-                       throws Exception {
+       public void testWithEmptyGraphWithoutVertices() throws Exception {
+               DataSet<Vertex<LongValue, LongValue>> degree;
+
+               degree = emptyGraphWithoutVertices
+                       .run(new VertexDegree<LongValue, NullValue, NullValue>()
+                               .setIncludeZeroDegreeVertices(false));
+
+               assertEquals(0, degree.collect().size());
+
+               degree = emptyGraphWithoutVertices
+                       .run(new VertexDegree<LongValue, NullValue, NullValue>()
+                               .setIncludeZeroDegreeVertices(true));
+
+               assertEquals(0, degree.collect().size());
+       }
+
+       @Test
+       public void testWithRMatGraph() throws Exception {
                DataSet<Vertex<LongValue, LongValue>> degreeOnSourceId = 
undirectedRMatGraph(10, 16)
-                       .run(new VertexDegree<>());
+                       .run(new VertexDegree<LongValue, NullValue, NullValue>()
+                               .setReduceOnTargetId(false));
 
                Checksum checksumOnSourceId = new 
ChecksumHashCode<Vertex<LongValue, LongValue>>()
                        .run(degreeOnSourceId)

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
index 51e7712..f43be9c 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.graph.library.metric.ChecksumHashCode;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
 import org.junit.Test;
@@ -33,12 +34,10 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for {@link MaximumDegree}.
  */
-public class MaximumDegreeTest
-extends AsmTestBase {
+public class MaximumDegreeTest extends AsmTestBase {
 
        @Test
-       public void testWithSimpleGraph()
-                       throws Exception {
+       public void testWithSimpleGraph() throws Exception {
                Graph<IntValue, NullValue, NullValue> graph = 
undirectedSimpleGraph
                        .run(new MaximumDegree<>(3));
 
@@ -63,8 +62,25 @@ extends AsmTestBase {
        }
 
        @Test
-       public void testWithRMatGraph()
-                       throws Exception {
+       public void testWithEmptyGraphWithVertices() throws Exception {
+               Graph<LongValue, NullValue, NullValue> graph = 
emptyGraphWithVertices
+                       .run(new MaximumDegree<>(1));
+
+               assertEquals(emptyGraphVertexCount, 
graph.getVertices().collect().size());
+               assertEquals(0, graph.getEdges().collect().size());
+       }
+
+       @Test
+       public void testWithEmptyGraphWithoutVertices() throws Exception {
+               Graph<LongValue, NullValue, NullValue> graph = 
emptyGraphWithoutVertices
+                       .run(new MaximumDegree<>(1));
+
+               assertEquals(0, graph.getVertices().collect().size());
+               assertEquals(0, graph.getEdges().collect().size());
+       }
+
+       @Test
+       public void testWithRMatGraph() throws Exception {
                Checksum checksum = undirectedRMatGraph(10, 16)
                        .run(new MaximumDegree<>(16))
                        .run(new ChecksumHashCode<>())

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
index 751d030..f31c09a 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
@@ -62,8 +62,7 @@ public class SimplifyTest {
        }
 
        @Test
-       public void test()
-                       throws Exception {
+       public void test() throws Exception {
                String expectedResult =
                        "(0,1,(null))\n" +
                        "(0,2,(null))\n" +

Reply via email to