[FLINK-3945] [gelly] Degree annotation for directed graphs

This closes #2021


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/65545c2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/65545c2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/65545c2e

Branch: refs/heads/master
Commit: 65545c2ed46c17df6abd85299b91bad2529cd42c
Parents: 36ad78c
Author: Greg Hogan <[email protected]>
Authored: Fri May 20 12:54:16 2016 -0400
Committer: Greg Hogan <[email protected]>
Committed: Thu Jun 2 10:22:29 2016 -0400

----------------------------------------------------------------------
 docs/apis/batch/libs/gelly.md                   |  55 +++-
 .../java/org/apache/flink/graph/EdgeOrder.java  |  47 ++++
 .../annotate/DegreeAnnotationFunctions.java     |  88 +------
 .../annotate/directed/EdgeDegreesPair.java      |  81 ++++++
 .../annotate/directed/EdgeSourceDegrees.java    |  75 ++++++
 .../annotate/directed/EdgeTargetDegrees.java    |  75 ++++++
 .../annotate/directed/VertexDegreePair.java     | 107 --------
 .../degree/annotate/directed/VertexDegrees.java | 262 +++++++++++++++++++
 .../annotate/directed/VertexInDegree.java       |   7 +-
 .../annotate/directed/VertexOutDegree.java      |   7 +-
 .../graph/asm/degree/annotate/package-info.java |  27 +-
 .../annotate/undirected/EdgeDegreePair.java     |   9 +-
 .../annotate/undirected/EdgeSourceDegree.java   |   4 +-
 .../annotate/undirected/EdgeTargetDegree.java   |   4 +-
 .../annotate/undirected/VertexDegree.java       |   3 +-
 .../annotate/directed/EdgeDegreesPairTest.java  |  66 +++++
 .../directed/EdgeSourceDegreesTest.java         |  66 +++++
 .../directed/EdgeTargetDegreesTest.java         |  66 +++++
 .../annotate/directed/VertexDegreePairTest.java |  87 ------
 .../annotate/directed/VertexDegreesTest.java    | 104 ++++++++
 .../annotate/directed/VertexInDegreeTest.java   |   7 +-
 .../annotate/directed/VertexOutDegreeTest.java  |   6 +-
 22 files changed, 947 insertions(+), 306 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/docs/apis/batch/libs/gelly.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index aadbd44..05fbcb5 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -2164,12 +2164,12 @@ DataSet<Vertex<K, LongValue>> outDegree = graph
     </tr>
 
     <tr>
-      <td>degree.annotate.directed.<br/><strong>VertexDegreePair</strong></td>
+      <td>degree.annotate.directed.<br/><strong>VertexDegrees</strong></td>
       <td>
-        <p>Annotate vertices of a <a href="#graph-representation">directed 
graph</a> with both the out-degree and in-degree.</p>
+        <p>Annotate vertices of a <a href="#graph-representation">directed 
graph</a> with the degree, out-degree, and in-degree.</p>
 {% highlight java %}
-DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> pairDegree = graph
-  .run(new VertexDegreePair()
+DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> degrees = graph
+  .run(new VertexDegrees()
     .setIncludeZeroDegreeVertices(true));
 {% endhighlight %}
         <p>Optional configuration:</p>
@@ -2181,6 +2181,51 @@ DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> 
pairDegree = graph
     </tr>
 
     <tr>
+      <td>degree.annotate.directed.<br/><strong>EdgeSourceDegrees</strong></td>
+      <td>
+        <p>Annotate edges of a <a href="#graph-representation">directed 
graph</a> with the degree, out-degree, and in-degree of the source ID.</p>
+{% highlight java %}
+DataSet<Edge<K, Tuple2<EV, Degrees>>> sourceDegrees = graph
+  .run(new EdgeSourceDegrees());
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator 
parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>degree.annotate.directed.<br/><strong>EdgeTargetDegrees</strong></td>
+      <td>
+        <p>Annotate edges of a <a href="#graph-representation">directed 
graph</a> with the degree, out-degree, and in-degree of the target ID.</p>
+{% highlight java %}
+DataSet<Edge<K, Tuple2<EV, Degrees>>> targetDegrees = graph
+  .run(new EdgeTargetDegrees();
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator 
parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>degree.annotate.directed.<br/><strong>EdgeDegreesPair</strong></td>
+      <td>
+        <p>Annotate edges of a <a href="#graph-representation">directed 
graph</a> with the degree, out-degree, and in-degree of both the source and 
target vertices.</p>
+{% highlight java %}
+DataSet<Edge<K, Tuple2<EV, Degrees>>> degrees = graph
+  .run(new EdgeDegreesPair());
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator 
parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
       <td>degree.annotate.undirected.<br/><strong>VertexDegree</strong></td>
       <td>
         <p>Annotate vertices of an <a href="#graph-representation">undirected 
graph</a> with the degree.</p>
@@ -2236,7 +2281,7 @@ DataSet<Edge<K, Tuple2<EV, LongValue>>> targetDegree = 
graph
     <tr>
       <td>degree.annotate.undirected.<br/><strong>EdgeDegreePair</strong></td>
       <td>
-        <p>Annotate edges of an <a href="#graph-representation">undirected 
graph</a> with the degree of both the source and target degree ID.</p>
+        <p>Annotate edges of an <a href="#graph-representation">undirected 
graph</a> with the degree of both the source and target vertices.</p>
 {% highlight java %}
 DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> pairDegree = graph
   .run(new EdgeDegreePair()

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeOrder.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeOrder.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeOrder.java
new file mode 100644
index 0000000..08e955a
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeOrder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+/**
+ * These bitmasks are used by edge-flipping algorithms to mark the edge order
+ * relative to the original edge direction.
+ */
+public enum EdgeOrder {
+
+       FORWARD(0b01),
+       REVERSE(0b10),
+       MUTUAL(0b11);
+
+       private final byte bitmask;
+
+       EdgeOrder(int bitmask) {
+               this.bitmask = (byte)bitmask;
+       }
+
+       /**
+        * Returns a bitmask used for marking whether an edge is in the same
+        * direction as in the original edge set (FORWARD), is flipped relative
+        * to the original edge set (REVERSE), or both (MUTUAL).
+        *
+        * @return edge order bitmask
+        */
+       public byte getBitmask() {
+               return bitmask;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java
index 098e9fe..b91b4cb 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java
@@ -98,7 +98,7 @@ public class DegreeAnnotationFunctions {
         */
        @ForwardedFieldsFirst("0")
        @ForwardedFieldsSecond("0")
-       public static final class JoinVertexWithVertexDegree<K, VV>
+       public static class JoinVertexWithVertexDegree<K, VV>
        implements JoinFunction<Vertex<K, VV>, Vertex<K, LongValue>, Vertex<K, 
LongValue>> {
                private LongValue zero = new LongValue(0);
 
@@ -114,68 +114,6 @@ public class DegreeAnnotationFunctions {
                }
        }
 
-       /**
-        * Performs a left outer join to apply a zero count for vertices with
-        * out- and in-degree of zero.
-        *
-        * @param <K> ID type
-        * @param <VV> vertex value type
-        */
-       @ForwardedFieldsFirst("0")
-       @ForwardedFieldsSecond("0")
-       public static final class JoinVertexWithVertexDegrees<K, VV>
-       implements JoinFunction<Vertex<K, VV>, Vertex<K, Tuple2<LongValue, 
LongValue>>, Vertex<K, Tuple2<LongValue, LongValue>>> {
-               private Tuple2<LongValue, LongValue> zeros;
-
-               private Vertex<K, Tuple2<LongValue, LongValue>> output = new 
Vertex<>();
-
-               public JoinVertexWithVertexDegrees() {
-                       LongValue zero = new LongValue(0);
-                       zeros = new Tuple2<>(zero, zero);
-               }
-               @Override
-               public Vertex<K, Tuple2<LongValue, LongValue>> join(Vertex<K, 
VV> vertex, Vertex<K, Tuple2<LongValue, LongValue>> vertexDegree)
-                               throws Exception {
-                       output.f0 = vertex.f0;
-                       output.f1 = (vertexDegree == null) ? zeros : 
vertexDegree.f1;
-
-                       return output;
-               }
-       }
-
-       /**
-        * Performs a full outer join composing vertex out- and in-degree and
-        * applying a zero count for vertices having an out- or in-degree of 
zero.
-        *
-        * @param <K> ID type
-        */
-       @ForwardedFieldsFirst("0")
-       @ForwardedFieldsSecond("0")
-       public static final class JoinVertexDegreeWithVertexDegree<K>
-       implements JoinFunction<Vertex<K, LongValue>, Vertex<K, LongValue>, 
Vertex<K, Tuple2<LongValue, LongValue>>> {
-               private LongValue zero = new LongValue(0);
-
-               private Tuple2<LongValue, LongValue> degrees = new Tuple2<>();
-
-               private Vertex<K, Tuple2<LongValue, LongValue>> output = new 
Vertex<>(null, degrees);
-
-               @Override
-               public Vertex<K, Tuple2<LongValue, LongValue>> join(Vertex<K, 
LongValue> left, Vertex<K, LongValue> right)
-                               throws Exception {
-                       if (left == null) {
-                               output.f0 = right.f0;
-                               degrees.f0 = zero;
-                               degrees.f1 = right.f1;
-                       } else {
-                               output.f0 = left.f0;
-                               degrees.f0 = left.f1;
-                               degrees.f1 = (right == null) ? zero : right.f1;
-                       }
-
-                       return output;
-               }
-       }
-
        // 
--------------------------------------------------------------------------------------------
        //  Edge functions
        // 
--------------------------------------------------------------------------------------------
@@ -185,17 +123,18 @@ public class DegreeAnnotationFunctions {
         *
         * @param <K> ID type
         * @param <EV> edge value type
+        * @param <D> degree type
         */
        @ForwardedFieldsFirst("0; 1; 2->2.0")
        @ForwardedFieldsSecond("0; 1->2.1")
-       public static final class JoinEdgeWithVertexDegree<K, EV>
-       implements JoinFunction<Edge<K, EV>, Vertex<K, LongValue>, Edge<K, 
Tuple2<EV, LongValue>>> {
-               private Tuple2<EV, LongValue> valueAndDegree = new Tuple2<>();
+       public static class JoinEdgeWithVertexDegree<K, EV, D>
+       implements JoinFunction<Edge<K, EV>, Vertex<K, D>, Edge<K, Tuple2<EV, 
D>>> {
+               private Tuple2<EV, D> valueAndDegree = new Tuple2<>();
 
-               private Edge<K, Tuple2<EV, LongValue>> output = new 
Edge<>(null, null, valueAndDegree);
+               private Edge<K, Tuple2<EV, D>> output = new Edge<>(null, null, 
valueAndDegree);
 
                @Override
-               public Edge<K, Tuple2<EV, LongValue>> join(Edge<K, EV> edge, 
Vertex<K, LongValue> vertex) throws Exception {
+               public Edge<K, Tuple2<EV, D>> join(Edge<K, EV> edge, Vertex<K, 
D> vertex) throws Exception {
                        output.f0 = edge.f0;
                        output.f1 = edge.f1;
                        valueAndDegree.f0 = edge.f2;
@@ -210,19 +149,20 @@ public class DegreeAnnotationFunctions {
         *
         * @param <K> ID type
         * @param <EV> edge value type
+        * @param <D> degree type
         */
        @ForwardedFieldsFirst("0; 1; 2.0; 2.1")
        @ForwardedFieldsSecond("0; 1->2.2")
-       public static final class JoinEdgeDegreeWithVertexDegree<K, EV>
-       implements JoinFunction<Edge<K, Tuple2<EV, LongValue>>, Vertex<K, 
LongValue>, Edge<K, Tuple3<EV, LongValue, LongValue>>> {
-               private Tuple3<EV, LongValue, LongValue> valueAndDegrees = new 
Tuple3<>();
+       public static class JoinEdgeDegreeWithVertexDegree<K, EV, D>
+       implements JoinFunction<Edge<K, Tuple2<EV, D>>, Vertex<K, D>, Edge<K, 
Tuple3<EV, D, D>>> {
+               private Tuple3<EV, D, D> valueAndDegrees = new Tuple3<>();
 
-               private Edge<K, Tuple3<EV, LongValue, LongValue>> output = new 
Edge<>(null, null, valueAndDegrees);
+               private Edge<K, Tuple3<EV, D, D>> output = new Edge<>(null, 
null, valueAndDegrees);
 
                @Override
-               public Edge<K, Tuple3<EV, LongValue, LongValue>> join(Edge<K, 
Tuple2<EV, LongValue>> edge, Vertex<K, LongValue> vertex)
+               public Edge<K, Tuple3<EV, D, D>> join(Edge<K, Tuple2<EV, D>> 
edge, Vertex<K, D> vertex)
                                throws Exception {
-                       Tuple2<EV, LongValue> valueAndDegree = edge.f2;
+                       Tuple2<EV, D> valueAndDegree = edge.f2;
 
                        output.f0 = edge.f0;
                        output.f1 = edge.f1;

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
new file mode 100644
index 0000000..5aa5ca4
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
+import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+
+/**
+ * Annotates edges of a directed graph with the degree, out-degree, and
+ * in-degree of both the source and target vertices.
+ *
+ * @param <K> ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class EdgeDegreesPair<K, VV, EV>
+implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, Degrees, 
Degrees>>>> {
+
+       // Optional configuration
+       private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+       /**
+        * Override the operator parallelism.
+        *
+        * @param parallelism operator parallelism
+        * @return this
+        */
+       public EdgeDegreesPair<K, VV, EV> setParallelism(int parallelism) {
+               this.parallelism = parallelism;
+
+               return this;
+       }
+
+       @Override
+       public DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> run(Graph<K, VV, 
EV> input)
+                       throws Exception {
+               // s, t, d(s)
+               DataSet<Edge<K, Tuple2<EV, Degrees>>> edgeSourceDegrees = input
+                       .run(new EdgeSourceDegrees<K, VV, EV>()
+                               .setParallelism(parallelism));
+
+               // t, d(t)
+               DataSet<Vertex<K, Degrees>> vertexDegrees = input
+                       .run(new VertexDegrees<K, VV, EV>()
+                               .setParallelism(parallelism));
+
+               // s, t, (d(s), d(t))
+               return edgeSourceDegrees
+                       .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
+                       .where(1)
+                       .equalTo(0)
+                       .with(new JoinEdgeDegreeWithVertexDegree<K, EV, 
Degrees>())
+                               .setParallelism(parallelism)
+                               .name("Edge target degree");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
new file mode 100644
index 0000000..85ba0ed
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
+import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+
+/**
+ * Annotates edges of a directed graph with the degree, out-degree, and
+ * in-degree of the source vertex.
+ *
+ * @param <K> ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class EdgeSourceDegrees<K, VV, EV>
+implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> {
+
+       // Optional configuration
+       private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+       /**
+        * Override the operator parallelism.
+        *
+        * @param parallelism operator parallelism
+        * @return this
+        */
+       public EdgeSourceDegrees<K, VV, EV> setParallelism(int parallelism) {
+               this.parallelism = parallelism;
+
+               return this;
+       }
+
+       @Override
+       public DataSet<Edge<K, Tuple2<EV, Degrees>>> run(Graph<K, VV, EV> input)
+                       throws Exception {
+               // s, d(s)
+               DataSet<Vertex<K, Degrees>> vertexDegrees = input
+                       .run(new VertexDegrees<K, VV, EV>()
+                               .setParallelism(parallelism));
+
+               // s, t, d(s)
+               return input.getEdges()
+                       .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
+                       .where(0)
+                       .equalTo(0)
+                       .with(new JoinEdgeWithVertexDegree<K, EV, Degrees>())
+                               .setParallelism(parallelism)
+                               .name("Edge source degrees");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
new file mode 100644
index 0000000..6d72e44
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
+import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+
+/**
+ * Annotates edges of a directed graph with the degree, out-degree, and
+ * in-degree of the target vertex.
+ *
+ * @param <K> ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class EdgeTargetDegrees<K, VV, EV>
+implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> {
+
+       // Optional configuration
+       private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+       /**
+        * Override the operator parallelism.
+        *
+        * @param parallelism operator parallelism
+        * @return this
+        */
+       public EdgeTargetDegrees<K, VV, EV> setParallelism(int parallelism) {
+               this.parallelism = parallelism;
+
+               return this;
+       }
+
+       @Override
+       public DataSet<Edge<K, Tuple2<EV, Degrees>>> run(Graph<K, VV, EV> input)
+                       throws Exception {
+               // t, d(t)
+               DataSet<Vertex<K, Degrees>> vertexDegrees = input
+                       .run(new VertexDegrees<K, VV, EV>()
+                               .setParallelism(parallelism));
+
+               // s, t, d(t)
+               return input.getEdges()
+                       .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
+                       .where(1)
+                       .equalTo(0)
+                       .with(new JoinEdgeWithVertexDegree<K, EV, Degrees>())
+                               .setParallelism(parallelism)
+                               .name("Edge target degrees");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePair.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePair.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePair.java
deleted file mode 100644
index 3e2cae0..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePair.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.asm.degree.annotate.directed;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexDegreeWithVertexDegree;
-import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegrees;
-import org.apache.flink.types.LongValue;
-
-/**
- * Annotates vertices of a directed graph with both the out-degree and 
in-degree.
- *
- * @param <K> ID type
- * @param <VV> vertex value type
- * @param <EV> edge value type
- */
-public class VertexDegreePair<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Tuple2<LongValue, 
LongValue>>>> {
-
-       // Optional configuration
-       private boolean includeZeroDegreeVertices = false;
-
-       private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
-
-       /**
-        * By default only the edge set is processed for the computation of 
degree.
-        * When this flag is set an additional join is performed against the 
vertex
-        * set in order to output vertices with out- and in-degree of zero.
-        *
-        * @param includeZeroDegreeVertices whether to output vertices with out-
-        *                                  and in-degree of zero
-        * @return this
-        */
-       public VertexDegreePair<K, VV, EV> setIncludeZeroDegreeVertices(boolean 
includeZeroDegreeVertices) {
-               this.includeZeroDegreeVertices = includeZeroDegreeVertices;
-
-               return this;
-       }
-
-       /**
-        * Override the operator parallelism.
-        *
-        * @param parallelism operator parallelism
-        * @return this
-        */
-       public VertexDegreePair<K, VV, EV> setParallelism(int parallelism) {
-               this.parallelism = parallelism;
-
-               return this;
-       }
-
-       @Override
-       public DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> run(Graph<K, 
VV, EV> input)
-                       throws Exception {
-               // s, deg(s)
-               DataSet<Vertex<K, LongValue>> outDegree = input
-                       .run(new VertexOutDegree<K, VV, EV>()
-                               .setIncludeZeroDegreeVertices(false));
-
-               // t, deg(t)
-               DataSet<Vertex<K, LongValue>> inDegree = input
-                       .run(new VertexInDegree<K, VV, EV>()
-                               .setIncludeZeroDegreeVertices(false));
-
-               DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> degree = 
outDegree
-                       .fullOuterJoin(inDegree)
-                       .where(0)
-                       .equalTo(0)
-                       .with(new JoinVertexDegreeWithVertexDegree<K>())
-                               .setParallelism(parallelism)
-                               .name("Join out- and in-degree");
-
-               if (includeZeroDegreeVertices) {
-                       degree = input
-                               .getVertices()
-                               .leftOuterJoin(degree)
-                               .where(0)
-                               .equalTo(0)
-                               .with(new JoinVertexWithVertexDegrees<K, VV>())
-                                       .setParallelism(parallelism)
-                                       .name("Join zero degree vertices");
-               }
-
-               return degree;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
new file mode 100644
index 0000000..aab0eb6
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeOrder;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+
+/**
+ * Annotates vertices of a directed graph with the degree, out-, and in-degree.
+ *
+ * @param <K> graph label type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class VertexDegrees<K, VV, EV>
+implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Degrees>>> {
+
+       // Optional configuration
+       private boolean includeZeroDegreeVertices = false;
+
+       private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+       /**
+        * By default only the edge set is processed for the computation of 
degree.
+        * When this flag is set an additional join is performed against the 
vertex
+        * set in order to output vertices with an in-degree of zero.
+        *
+        * @param includeZeroDegreeVertices whether to output vertices with an
+        *                                  in-degree of zero
+        * @return this
+        */
+       public VertexDegrees<K, VV, EV> setIncludeZeroDegreeVertices(boolean 
includeZeroDegreeVertices) {
+               this.includeZeroDegreeVertices = includeZeroDegreeVertices;
+
+               return this;
+       }
+
+       /**
+        * Override the operator parallelism.
+        *
+        * @param parallelism operator parallelism
+        * @return this
+        */
+       public VertexDegrees<K, VV, EV> setParallelism(int parallelism) {
+               this.parallelism = parallelism;
+
+               return this;
+       }
+
+       @Override
+       public DataSet<Vertex<K, Degrees>> run(Graph<K, VV, EV> input)
+                       throws Exception {
+               // s, t, bitmask
+               DataSet<Tuple3<K, K, ByteValue>> edgesWithOrder = 
input.getEdges()
+                       .flatMap(new EmitAndFlipEdge<K, EV>())
+                               .setParallelism(parallelism)
+                               .name("Emit and flip edge")
+                       .groupBy(0, 1)
+                               .reduce(new ReduceBitmask<K>())
+                               .setParallelism(parallelism)
+                               .name("Reduce bitmask");
+
+               // s, d(s)
+               DataSet<Vertex<K, Degrees>> vertexDegrees = edgesWithOrder
+                       .groupBy(0)
+                       .sortGroup(1, Order.ASCENDING)
+                       .reduceGroup(new DegreeCount<K>())
+                               .setParallelism(parallelism)
+                               .name("Degree count");
+
+               if (includeZeroDegreeVertices) {
+                       vertexDegrees = input.getVertices()
+                               .leftOuterJoin(vertexDegrees)
+                               .where(0)
+                               .equalTo(0)
+                               .with(new JoinVertexWithVertexDegrees<K, VV>())
+                                       .setParallelism(parallelism)
+                                       .name("Join zero degree vertices");
+               }
+
+               return vertexDegrees;
+       }
+
+       /**
+        * Emit each vertex both forward and reversed with the associated 
bitmask.
+        *
+        * @param <T> ID type
+        * @param <TV> vertex value type
+        */
+       private static class EmitAndFlipEdge<T, TV>
+       implements FlatMapFunction<Edge<T, TV>, Tuple3<T, T, ByteValue>> {
+               private Tuple3<T, T, ByteValue> forward = new Tuple3<>(null, 
null, new ByteValue(EdgeOrder.FORWARD.getBitmask()));
+
+               private Tuple3<T, T, ByteValue> reverse = new Tuple3<>(null, 
null, new ByteValue(EdgeOrder.REVERSE.getBitmask()));
+
+               @Override
+               public void flatMap(Edge<T, TV> value, Collector<Tuple3<T, T, 
ByteValue>> out)
+                               throws Exception {
+                       forward.f0 = value.f0;
+                       forward.f1 = value.f1;
+                       out.collect(forward);
+
+                       reverse.f0 = value.f1;
+                       reverse.f1 = value.f0;
+                       out.collect(reverse);
+               }
+       }
+
+       /**
+        * Combine mutual edges.
+        *
+        * @param <T> ID type
+        */
+       private static class ReduceBitmask<T>
+       implements ReduceFunction<Tuple3<T, T, ByteValue>> {
+               @Override
+               public Tuple3<T, T, ByteValue> reduce(Tuple3<T, T, ByteValue> 
left, Tuple3<T, T, ByteValue> right)
+                               throws Exception {
+                       left.f2.setValue((byte)(left.f2.getValue() | 
right.f2.getValue()));
+                       return left;
+               }
+       }
+
+       /**
+        * Sum vertex degree by counting over mutual, out-, and in-edges.
+        *
+        * @param <T> ID type
+        */
+       private static class DegreeCount<T>
+       implements GroupReduceFunction<Tuple3<T, T, ByteValue>, Vertex<T, 
Degrees>> {
+               private Vertex<T, Degrees> output = new Vertex<>(null, new 
Degrees());
+
+               @Override
+               public void reduce(Iterable<Tuple3<T, T, ByteValue>> values, 
Collector<Vertex<T, Degrees>> out)
+                               throws Exception {
+                       long degree = 0;
+                       long outDegree = 0;
+                       long inDegree = 0;
+
+                       for (Tuple3<T, T, ByteValue> edge : values) {
+                               output.f0 = edge.f0;
+
+                               byte bitmask = edge.f2.getValue();
+
+                               degree++;
+
+                               if (bitmask == EdgeOrder.FORWARD.getBitmask()) {
+                                       outDegree++;
+                               } else if (bitmask == 
EdgeOrder.REVERSE.getBitmask()) {
+                                       inDegree++;
+                               } else {
+                                       outDegree++;
+                                       inDegree++;
+                               }
+                       }
+
+                       output.f1.getDegree().setValue(degree);
+                       output.f1.getOutDegree().setValue(outDegree);
+                       output.f1.getInDegree().setValue(inDegree);
+
+                       out.collect(output);
+               }
+       }
+
+       /**
+        * Performs a left outer join to apply a zero count for vertices with
+        * out- and in-degree of zero.
+        *
+        * @param <T> ID type
+        * @param <TV> vertex value type
+        */
+       @ForwardedFieldsFirst("0")
+       @ForwardedFieldsSecond("0")
+       private static class JoinVertexWithVertexDegrees<T, TV>
+       implements JoinFunction<Vertex<T, TV>, Vertex<T, Degrees>, Vertex<T, 
Degrees>> {
+               private Vertex<T, Degrees> output = new Vertex<>(null, new 
Degrees());
+
+               @Override
+               public Vertex<T, Degrees> join(Vertex<T, TV> vertex, Vertex<T, 
Degrees> vertexDegree)
+                               throws Exception {
+                       if (vertexDegree == null) {
+                               output.f0 = vertex.f0;
+                               return output;
+                       } else {
+                               return vertexDegree;
+                       }
+               }
+       }
+
+       /**
+        * Wraps the vertex degree, out-degree, and in-degree.
+        */
+       public static class Degrees
+       extends Tuple3<LongValue, LongValue, LongValue> {
+               private static final int HASH_SEED = 0x3a12fc31;
+
+               private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
+
+               public Degrees() {
+                       this(new LongValue(), new LongValue(), new LongValue());
+               }
+
+               public Degrees(LongValue value0, LongValue value1, LongValue 
value2) {
+                       super(value0, value1, value2);
+               }
+
+               public LongValue getDegree() {
+                       return f0;
+               }
+
+               public LongValue getOutDegree() {
+                       return f1;
+               }
+
+               public LongValue getInDegree() {
+                       return f2;
+               }
+
+               @Override
+               public int hashCode() {
+                       return hasher.reset()
+                               .hash(f0.getValue())
+                               .hash(f1.getValue())
+                               .hash(f2.getValue())
+                               .hash();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
index 5825628..00acedb 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
@@ -39,7 +39,7 @@ public class VertexInDegree<K, VV, EV>
 implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 
        // Optional configuration
-       private boolean includeZeroDegreeVertices = true;
+       private boolean includeZeroDegreeVertices = false;
 
        private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
 
@@ -80,7 +80,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, 
LongValue>>> {
                                .setParallelism(parallelism)
                                .name("Map edge to target ID");
 
-               // t, deg(t)
+               // t, d(t)
                DataSet<Vertex<K, LongValue>> targetDegree = targetIds
                        .groupBy(0)
                        .reduce(new DegreeCount<K>())
@@ -88,8 +88,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, 
LongValue>>> {
                                .name("Degree count");
 
                if (includeZeroDegreeVertices) {
-                       targetDegree = input
-                               .getVertices()
+                       targetDegree = input.getVertices()
                                .leftOuterJoin(targetDegree)
                                .where(0)
                                .equalTo(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
index 2676104..bcca19d 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
@@ -39,7 +39,7 @@ public class VertexOutDegree<K, VV, EV>
 implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 
        // Optional configuration
-       private boolean includeZeroDegreeVertices = true;
+       private boolean includeZeroDegreeVertices = false;
 
        private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
 
@@ -80,7 +80,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, 
LongValue>>> {
                                .setParallelism(parallelism)
                                .name("Map edge to source ID");
 
-               // s, deg(s)
+               // s, d(s)
                DataSet<Vertex<K, LongValue>> sourceDegree = sourceIds
                        .groupBy(0)
                        .reduce(new DegreeCount<K>())
@@ -88,8 +88,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, 
LongValue>>> {
                                .name("Degree count");
 
                if (includeZeroDegreeVertices) {
-                       sourceDegree = input
-                               .getVertices()
+                       sourceDegree = input.getVertices()
                                .leftOuterJoin(sourceDegree)
                                .where(0)
                                .equalTo(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/package-info.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/package-info.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/package-info.java
index f25fd90..a138b1a 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/package-info.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/package-info.java
@@ -24,18 +24,25 @@
  * equivalent to the out-degree.
  *
  * The undirected graph algorithms are:
- *   {@code VertexDegree}     annotates vertices as <v, deg(v)>
- *   {@code EdgeSourceDegree} annotates edges as <s, t, deg(s)>
- *   {@code EdgeTargetDegree} annotates edges as <s, t, deg(t)>
- *   {@code EdgeDegreePair}   annotates edges as <s, t, (deg(s), deg(t))>
+ *   {@code VertexDegree}      annotates vertices as <v, deg(v)>
+ *   {@code EdgeSourceDegree}  annotates edges as <s, t, (EV, deg(s))>
+ *   {@code EdgeTargetDegree}  annotates edges as <s, t, (EV, deg(t))>
+ *   {@code EdgeDegreePair}    annotates edges as <s, t, (EV, deg(s), deg(t))>
  *
  * The directed graph algorithms are:
- *   {@code VertexOutDegree}  annotates vertices as <v, out(v)>
- *   {@code VertexInDegree}   annotates vertices as <v, in(v)>
- *   {@code VertexDegreePair} annotates vertices as <v, (out(v), in(v))>
+ *   {@code VertexDegrees}     annotates vertices as <v, (deg(v), out(v), 
in(v))>
+ *   {@code VertexOutDegree}   annotates vertices as <v, out(v)>
+ *   {@code VertexInDegree}    annotates vertices as <v, in(v)>
+ *   {@code EdgeSourceDegrees} annotates edges as <s, t, (deg(s), out(s), 
in(s))>
+ *   {@code EdgeTargetDegrees} annotates edges as <s, t, (deg(t), out(t), 
in(t))>
+ *   {@code EdgeDegreesPair}   annotates edges as <s, t, ((deg(s), out(s), 
in(s)), (deg(t), out(t), in(t)))>
  *
- * A directed graph edge has four possible degrees: source out- and in-degree
- * and target out- and in-degree. This gives 2^4 - 1 = 15 ways to annotate
- * a directed edge.
+ * where:
+ *   EV is the original edge value
+ *   deg(x) is the number of vertex neighbors
+ *   out(x) is the number of vertex neighbors connected by an out-edge
+ *   in(x) is the number of vertex neighbors connected by an in-edge
+ *
+ * (out(x) + in(x)) / 2 <= deg(x) <= out(x) + in(x)
  */
 package org.apache.flink.graph.asm.degree.annotate;

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
index 698a46a..8cc2e08 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
@@ -31,7 +31,8 @@ import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.Join
 import org.apache.flink.types.LongValue;
 
 /**
- * Annotates edges of an undirected graph with the degree of both the source 
and target degree ID.
+ * Annotates edges of an undirected graph with the degree of both the source
+ * and target degree vertices.
  *
  * @param <K> ID type
  * @param <VV> vertex value type
@@ -75,7 +76,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, 
Tuple3<EV, LongValue, LongV
        @Override
        public DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> run(Graph<K, 
VV, EV> input)
                        throws Exception {
-               // s, t, deg(s)
+               // s, t, d(s)
                DataSet<Edge<K, Tuple2<EV, LongValue>>> edgeSourceDegrees = 
input
                        .run(new EdgeSourceDegree<K, VV, EV>()
                                .setReduceOnTargetId(reduceOnTargetId)
@@ -87,12 +88,12 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, 
Tuple3<EV, LongValue, LongV
                                .setReduceOnTargetId(reduceOnTargetId)
                                .setParallelism(parallelism));
 
-               // s, t, (deg(s), deg(t))
+               // s, t, (d(s), d(t))
                return edgeSourceDegrees
                        .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
                        .where(1)
                        .equalTo(0)
-                       .with(new JoinEdgeDegreeWithVertexDegree<K,EV>())
+                       .with(new JoinEdgeDegreeWithVertexDegree<K, EV, 
LongValue>())
                                .setParallelism(parallelism)
                                .name("Edge target degree");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
index 6f64bc1..b9b59c5 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
@@ -30,7 +30,7 @@ import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.Join
 import org.apache.flink.types.LongValue;
 
 /**
- * Annotates edges of an undirected graph with degree of the source ID.
+ * Annotates edges of an undirected graph with degree of the source vertex.
  *
  * @param <K> ID type
  * @param <VV> vertex value type
@@ -85,7 +85,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, 
Tuple2<EV, LongValue>>>> {
                        .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
                        .where(0)
                        .equalTo(0)
-                       .with(new JoinEdgeWithVertexDegree<K, EV>())
+                       .with(new JoinEdgeWithVertexDegree<K, EV, LongValue>())
                                .setParallelism(parallelism)
                                .name("Edge source degree");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
index e9505bc..eabfee7 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
@@ -30,7 +30,7 @@ import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.Join
 import org.apache.flink.types.LongValue;
 
 /**
- * Annotates edges of an undirected graph with degree of the target ID.
+ * Annotates edges of an undirected graph with degree of the target vertex.
  *
  * @param <K> ID type
  * @param <VV> vertex value type
@@ -85,7 +85,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, 
Tuple2<EV, LongValue>>>> {
                        .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
                        .where(1)
                        .equalTo(0)
-                       .with(new JoinEdgeWithVertexDegree<K, EV>())
+                       .with(new JoinEdgeWithVertexDegree<K, EV, LongValue>())
                                .setParallelism(parallelism)
                                .name("Edge target degree");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
index 518afaa..61a5e82 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
@@ -111,8 +111,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, 
LongValue>>> {
                                .name("Degree count");
 
                if (includeZeroDegreeVertices) {
-                       degree = input
-                               .getVertices()
+                       degree = input.getVertices()
                                .leftOuterJoin(degree)
                                .where(0)
                                .equalTo(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
new file mode 100644
index 0000000..3fcb9dd
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.asm.AsmTestBase;
+import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class EdgeDegreesPairTest
+extends AsmTestBase {
+
+       @Test
+       public void testWithSimpleGraph()
+                       throws Exception {
+               String expectedResult =
+                       "(0,1,((null),(2,2,0),(3,2,1)))\n" +
+                       "(0,2,((null),(2,2,0),(3,1,2)))\n" +
+                       "(1,2,((null),(3,2,1),(3,1,2)))\n" +
+                       "(1,3,((null),(3,2,1),(4,2,2)))\n" +
+                       "(2,3,((null),(3,1,2),(4,2,2)))\n" +
+                       "(3,4,((null),(4,2,2),(1,0,1)))\n" +
+                       "(3,5,((null),(4,2,2),(1,0,1)))";
+
+               DataSet<Edge<IntValue, Tuple3<NullValue, Degrees, Degrees>>> 
degrees = directedSimpleGraph
+                       .run(new EdgeDegreesPair<IntValue, NullValue, 
NullValue>());
+
+               TestBaseUtils.compareResultAsText(degrees.collect(), 
expectedResult);
+       }
+
+       @Test
+       public void testWithRMatGraph()
+                       throws Exception {
+               ChecksumHashCode degreeChecksum = 
DataSetUtils.checksumHashCode(directedRMatGraph
+                       .run(new EdgeDegreesPair<LongValue, NullValue, 
NullValue>()));
+
+               assertEquals(16384, degreeChecksum.getCount());
+               assertEquals(0x00001f68dfabd17cL, degreeChecksum.getChecksum());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
new file mode 100644
index 0000000..2b22eea
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.asm.AsmTestBase;
+import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class EdgeSourceDegreesTest
+extends AsmTestBase {
+
+       @Test
+       public void testWithSimpleGraph()
+                       throws Exception {
+               String expectedResult =
+                       "(0,1,((null),(2,2,0)))\n" +
+                       "(0,2,((null),(2,2,0)))\n" +
+                       "(1,2,((null),(3,2,1)))\n" +
+                       "(1,3,((null),(3,2,1)))\n" +
+                       "(2,3,((null),(3,1,2)))\n" +
+                       "(3,4,((null),(4,2,2)))\n" +
+                       "(3,5,((null),(4,2,2)))";
+
+               DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> degrees = 
directedSimpleGraph
+                               .run(new EdgeSourceDegrees<IntValue, NullValue, 
NullValue>());
+
+               TestBaseUtils.compareResultAsText(degrees.collect(), 
expectedResult);
+       }
+
+       @Test
+       public void testWithRMatGraph()
+                       throws Exception {
+               ChecksumHashCode sourceDegreeChecksum = 
DataSetUtils.checksumHashCode(directedRMatGraph
+                       .run(new EdgeSourceDegrees<LongValue, NullValue, 
NullValue>()));
+
+               assertEquals(16384, sourceDegreeChecksum.getCount());
+               assertEquals(0x00001ec53bd55136L, 
sourceDegreeChecksum.getChecksum());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
new file mode 100644
index 0000000..6840dc5
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.asm.AsmTestBase;
+import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class EdgeTargetDegreesTest
+extends AsmTestBase {
+
+       @Test
+       public void testWithSimpleGraph()
+                       throws Exception {
+               String expectedResult =
+                       "(0,1,((null),(3,2,1)))\n" +
+                       "(0,2,((null),(3,1,2)))\n" +
+                       "(1,2,((null),(3,1,2)))\n" +
+                       "(1,3,((null),(4,2,2)))\n" +
+                       "(2,3,((null),(4,2,2)))\n" +
+                       "(3,4,((null),(1,0,1)))\n" +
+                       "(3,5,((null),(1,0,1)))";
+
+               DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> degrees = 
directedSimpleGraph
+                               .run(new EdgeTargetDegrees<IntValue, NullValue, 
NullValue>());
+
+               TestBaseUtils.compareResultAsText(degrees.collect(), 
expectedResult);
+       }
+
+       @Test
+       public void testWithRMatGraph()
+                       throws Exception {
+               ChecksumHashCode targetDegreeChecksum = 
DataSetUtils.checksumHashCode(directedRMatGraph
+                       .run(new EdgeTargetDegrees<LongValue, NullValue, 
NullValue>()));
+
+               assertEquals(16384, targetDegreeChecksum.getCount());
+               assertEquals(0x00001f2867ba8b4fL, 
targetDegreeChecksum.getChecksum());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePairTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePairTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePairTest.java
deleted file mode 100644
index 9400532..0000000
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePairTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.asm.degree.annotate.directed;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.asm.AsmTestBase;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class VertexDegreePairTest
-extends AsmTestBase {
-
-       @Test
-       public void testWithSimpleGraph()
-                       throws Exception {
-               DataSet<Vertex<IntValue, Tuple2<LongValue, LongValue>>> 
vertexDegrees = directedSimpleGraph
-                       .run(new VertexDegreePair<IntValue, NullValue, 
NullValue>());
-
-               String expectedResult =
-                       "(0,(2,0))\n" +
-                       "(1,(2,1))\n" +
-                       "(2,(1,2))\n" +
-                       "(3,(2,2))\n" +
-                       "(4,(0,1))\n" +
-                       "(5,(0,1))";
-
-               TestBaseUtils.compareResultAsText(vertexDegrees.collect(), 
expectedResult);
-       }
-
-       @Test
-       public void testWithEmptyGraph()
-                       throws Exception {
-               DataSet<Vertex<LongValue, Tuple2<LongValue, LongValue>>> 
vertexDegrees;
-
-               vertexDegrees = emptyGraph
-                       .run(new VertexDegreePair<LongValue, NullValue, 
NullValue>()
-                               .setIncludeZeroDegreeVertices(false));
-
-               assertEquals(0, vertexDegrees.collect().size());
-
-               vertexDegrees = emptyGraph
-                       .run(new VertexDegreePair<LongValue, NullValue, 
NullValue>()
-                               .setIncludeZeroDegreeVertices(true));
-
-               String expectedResult =
-                       "(0,(0,0))\n" +
-                       "(1,(0,0))\n" +
-                       "(2,(0,0))";
-
-               TestBaseUtils.compareResultAsText(vertexDegrees.collect(), 
expectedResult);
-       }
-
-       @Test
-       public void testWithRMatGraph()
-       throws Exception {
-               ChecksumHashCode degreePairChecksum = 
DataSetUtils.checksumHashCode(directedRMatGraph
-                       .run(new VertexDegreePair<LongValue, NullValue, 
NullValue>()));
-
-               assertEquals(902, degreePairChecksum.getCount());
-               assertEquals(0x0000000000fc025aL, 
degreePairChecksum.getChecksum());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
new file mode 100644
index 0000000..a0697a2
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.AsmTestBase;
+import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class VertexDegreesTest
+extends AsmTestBase {
+
+       @Test
+       public void testWithSimpleDirectedGraph()
+                       throws Exception {
+               DataSet<Vertex<IntValue, Degrees>> vertexDegrees = 
directedSimpleGraph
+                       .run(new VertexDegrees<IntValue, NullValue, 
NullValue>());
+
+               String expectedResult =
+                       "(0,(2,2,0))\n" +
+                       "(1,(3,2,1))\n" +
+                       "(2,(3,1,2))\n" +
+                       "(3,(4,2,2))\n" +
+                       "(4,(1,0,1))\n" +
+                       "(5,(1,0,1))";
+
+               TestBaseUtils.compareResultAsText(vertexDegrees.collect(), 
expectedResult);
+       }
+
+       @Test
+       public void testWithSimpleUndirectedGraph()
+                       throws Exception {
+               DataSet<Vertex<IntValue, Degrees>> vertexDegrees = 
undirectedSimpleGraph
+                       .run(new VertexDegrees<IntValue, NullValue, 
NullValue>());
+
+               String expectedResult =
+                       "(0,(2,2,2))\n" +
+                       "(1,(3,3,3))\n" +
+                       "(2,(3,3,3))\n" +
+                       "(3,(4,4,4))\n" +
+                       "(4,(1,1,1))\n" +
+                       "(5,(1,1,1))";
+
+               TestBaseUtils.compareResultAsText(vertexDegrees.collect(), 
expectedResult);
+       }
+
+       @Test
+       public void testWithEmptyGraph()
+                       throws Exception {
+               DataSet<Vertex<LongValue, Degrees>> vertexDegrees;
+
+               vertexDegrees = emptyGraph
+                       .run(new VertexDegrees<LongValue, NullValue, 
NullValue>()
+                               .setIncludeZeroDegreeVertices(false));
+
+               assertEquals(0, vertexDegrees.collect().size());
+
+               vertexDegrees = emptyGraph
+                       .run(new VertexDegrees<LongValue, NullValue, 
NullValue>()
+                               .setIncludeZeroDegreeVertices(true));
+
+               String expectedResult =
+                       "(0,(0,0,0))\n" +
+                       "(1,(0,0,0))\n" +
+                       "(2,(0,0,0))";
+
+               TestBaseUtils.compareResultAsText(vertexDegrees.collect(), 
expectedResult);
+       }
+
+       @Test
+       public void testWithRMatGraph()
+       throws Exception {
+               ChecksumHashCode degreeChecksum = 
DataSetUtils.checksumHashCode(directedRMatGraph
+                       .run(new VertexDegrees<LongValue, NullValue, 
NullValue>()));
+
+               assertEquals(902, degreeChecksum.getCount());
+               assertEquals(0x0000015384f40cb6L, degreeChecksum.getChecksum());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
index 577e675..0fa0fe5 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.graph.asm.degree.annotate.directed;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.AsmTestBase;
@@ -39,7 +38,8 @@ extends AsmTestBase {
        public void testWithSimpleGraph()
                        throws Exception {
                DataSet<Vertex<IntValue, LongValue>> vertexDegrees = 
directedSimpleGraph
-                       .run(new VertexInDegree<IntValue, NullValue, 
NullValue>());
+                       .run(new VertexInDegree<IntValue, NullValue, 
NullValue>()
+                               .setIncludeZeroDegreeVertices(true));
 
                String expectedResult =
                        "(0,0)\n" +
@@ -79,7 +79,8 @@ extends AsmTestBase {
        public void testWithRMatGraph()
                        throws Exception {
                ChecksumHashCode inDegreeChecksum = 
DataSetUtils.checksumHashCode(directedRMatGraph
-                       .run(new VertexInDegree<LongValue, NullValue, 
NullValue>()));
+                       .run(new VertexInDegree<LongValue, NullValue, 
NullValue>()
+                               .setIncludeZeroDegreeVertices(true)));
 
                assertEquals(902, inDegreeChecksum.getCount());
                assertEquals(0x0000000000e1e99cL, 
inDegreeChecksum.getChecksum());

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
index b5e9ce8..f7f3d48 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
@@ -38,7 +38,8 @@ extends AsmTestBase {
        public void testWithSimpleGraph()
                        throws Exception {
                DataSet<Vertex<IntValue, LongValue>> vertexDegrees = 
directedSimpleGraph
-                       .run(new VertexOutDegree<IntValue, NullValue, 
NullValue>());
+                       .run(new VertexOutDegree<IntValue, NullValue, 
NullValue>()
+                               .setIncludeZeroDegreeVertices(true));
 
                String expectedResult =
                        "(0,2)\n" +
@@ -78,7 +79,8 @@ extends AsmTestBase {
        public void testWithRMatGraph()
                        throws Exception {
                ChecksumHashCode outDegreeChecksum = 
DataSetUtils.checksumHashCode(directedRMatGraph
-                       .run(new VertexOutDegree<LongValue, NullValue, 
NullValue>()));
+                       .run(new VertexOutDegree<LongValue, NullValue, 
NullValue>()
+                               .setIncludeZeroDegreeVertices(true)));
 
                assertEquals(902, outDegreeChecksum.getCount());
                assertEquals(0x0000000000e1e99cL, 
outDegreeChecksum.getChecksum());

Reply via email to