[FLINK-3945] [gelly] Degree annotation for directed graphs This closes #2021
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/65545c2e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/65545c2e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/65545c2e Branch: refs/heads/master Commit: 65545c2ed46c17df6abd85299b91bad2529cd42c Parents: 36ad78c Author: Greg Hogan <[email protected]> Authored: Fri May 20 12:54:16 2016 -0400 Committer: Greg Hogan <[email protected]> Committed: Thu Jun 2 10:22:29 2016 -0400 ---------------------------------------------------------------------- docs/apis/batch/libs/gelly.md | 55 +++- .../java/org/apache/flink/graph/EdgeOrder.java | 47 ++++ .../annotate/DegreeAnnotationFunctions.java | 88 +------ .../annotate/directed/EdgeDegreesPair.java | 81 ++++++ .../annotate/directed/EdgeSourceDegrees.java | 75 ++++++ .../annotate/directed/EdgeTargetDegrees.java | 75 ++++++ .../annotate/directed/VertexDegreePair.java | 107 -------- .../degree/annotate/directed/VertexDegrees.java | 262 +++++++++++++++++++ .../annotate/directed/VertexInDegree.java | 7 +- .../annotate/directed/VertexOutDegree.java | 7 +- .../graph/asm/degree/annotate/package-info.java | 27 +- .../annotate/undirected/EdgeDegreePair.java | 9 +- .../annotate/undirected/EdgeSourceDegree.java | 4 +- .../annotate/undirected/EdgeTargetDegree.java | 4 +- .../annotate/undirected/VertexDegree.java | 3 +- .../annotate/directed/EdgeDegreesPairTest.java | 66 +++++ .../directed/EdgeSourceDegreesTest.java | 66 +++++ .../directed/EdgeTargetDegreesTest.java | 66 +++++ .../annotate/directed/VertexDegreePairTest.java | 87 ------ .../annotate/directed/VertexDegreesTest.java | 104 ++++++++ .../annotate/directed/VertexInDegreeTest.java | 7 +- .../annotate/directed/VertexOutDegreeTest.java | 6 +- 22 files changed, 947 insertions(+), 306 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/docs/apis/batch/libs/gelly.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md index aadbd44..05fbcb5 100644 --- a/docs/apis/batch/libs/gelly.md +++ b/docs/apis/batch/libs/gelly.md @@ -2164,12 +2164,12 @@ DataSet<Vertex<K, LongValue>> outDegree = graph </tr> <tr> - <td>degree.annotate.directed.<br/><strong>VertexDegreePair</strong></td> + <td>degree.annotate.directed.<br/><strong>VertexDegrees</strong></td> <td> - <p>Annotate vertices of a <a href="#graph-representation">directed graph</a> with both the out-degree and in-degree.</p> + <p>Annotate vertices of a <a href="#graph-representation">directed graph</a> with the degree, out-degree, and in-degree.</p> {% highlight java %} -DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> pairDegree = graph - .run(new VertexDegreePair() +DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> degrees = graph + .run(new VertexDegrees() .setIncludeZeroDegreeVertices(true)); {% endhighlight %} <p>Optional configuration:</p> @@ -2181,6 +2181,51 @@ DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> pairDegree = graph </tr> <tr> + <td>degree.annotate.directed.<br/><strong>EdgeSourceDegrees</strong></td> + <td> + <p>Annotate edges of a <a href="#graph-representation">directed graph</a> with the degree, out-degree, and in-degree of the source ID.</p> +{% highlight java %} +DataSet<Edge<K, Tuple2<EV, Degrees>>> sourceDegrees = graph + .run(new EdgeSourceDegrees()); +{% endhighlight %} + <p>Optional configuration:</p> + <ul> + <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li> + </ul> + </td> + </tr> + + <tr> + <td>degree.annotate.directed.<br/><strong>EdgeTargetDegrees</strong></td> + <td> + <p>Annotate edges of a <a href="#graph-representation">directed graph</a> with the degree, out-degree, and in-degree of the target ID.</p> +{% highlight java %} +DataSet<Edge<K, Tuple2<EV, Degrees>>> targetDegrees = graph + .run(new EdgeTargetDegrees(); +{% endhighlight %} + <p>Optional configuration:</p> + <ul> + <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li> + </ul> + </td> + </tr> + + <tr> + <td>degree.annotate.directed.<br/><strong>EdgeDegreesPair</strong></td> + <td> + <p>Annotate edges of a <a href="#graph-representation">directed graph</a> with the degree, out-degree, and in-degree of both the source and target vertices.</p> +{% highlight java %} +DataSet<Edge<K, Tuple2<EV, Degrees>>> degrees = graph + .run(new EdgeDegreesPair()); +{% endhighlight %} + <p>Optional configuration:</p> + <ul> + <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li> + </ul> + </td> + </tr> + + <tr> <td>degree.annotate.undirected.<br/><strong>VertexDegree</strong></td> <td> <p>Annotate vertices of an <a href="#graph-representation">undirected graph</a> with the degree.</p> @@ -2236,7 +2281,7 @@ DataSet<Edge<K, Tuple2<EV, LongValue>>> targetDegree = graph <tr> <td>degree.annotate.undirected.<br/><strong>EdgeDegreePair</strong></td> <td> - <p>Annotate edges of an <a href="#graph-representation">undirected graph</a> with the degree of both the source and target degree ID.</p> + <p>Annotate edges of an <a href="#graph-representation">undirected graph</a> with the degree of both the source and target vertices.</p> {% highlight java %} DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> pairDegree = graph .run(new EdgeDegreePair() http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeOrder.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeOrder.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeOrder.java new file mode 100644 index 0000000..08e955a --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeOrder.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +/** + * These bitmasks are used by edge-flipping algorithms to mark the edge order + * relative to the original edge direction. + */ +public enum EdgeOrder { + + FORWARD(0b01), + REVERSE(0b10), + MUTUAL(0b11); + + private final byte bitmask; + + EdgeOrder(int bitmask) { + this.bitmask = (byte)bitmask; + } + + /** + * Returns a bitmask used for marking whether an edge is in the same + * direction as in the original edge set (FORWARD), is flipped relative + * to the original edge set (REVERSE), or both (MUTUAL). + * + * @return edge order bitmask + */ + public byte getBitmask() { + return bitmask; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java index 098e9fe..b91b4cb 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java @@ -98,7 +98,7 @@ public class DegreeAnnotationFunctions { */ @ForwardedFieldsFirst("0") @ForwardedFieldsSecond("0") - public static final class JoinVertexWithVertexDegree<K, VV> + public static class JoinVertexWithVertexDegree<K, VV> implements JoinFunction<Vertex<K, VV>, Vertex<K, LongValue>, Vertex<K, LongValue>> { private LongValue zero = new LongValue(0); @@ -114,68 +114,6 @@ public class DegreeAnnotationFunctions { } } - /** - * Performs a left outer join to apply a zero count for vertices with - * out- and in-degree of zero. - * - * @param <K> ID type - * @param <VV> vertex value type - */ - @ForwardedFieldsFirst("0") - @ForwardedFieldsSecond("0") - public static final class JoinVertexWithVertexDegrees<K, VV> - implements JoinFunction<Vertex<K, VV>, Vertex<K, Tuple2<LongValue, LongValue>>, Vertex<K, Tuple2<LongValue, LongValue>>> { - private Tuple2<LongValue, LongValue> zeros; - - private Vertex<K, Tuple2<LongValue, LongValue>> output = new Vertex<>(); - - public JoinVertexWithVertexDegrees() { - LongValue zero = new LongValue(0); - zeros = new Tuple2<>(zero, zero); - } - @Override - public Vertex<K, Tuple2<LongValue, LongValue>> join(Vertex<K, VV> vertex, Vertex<K, Tuple2<LongValue, LongValue>> vertexDegree) - throws Exception { - output.f0 = vertex.f0; - output.f1 = (vertexDegree == null) ? zeros : vertexDegree.f1; - - return output; - } - } - - /** - * Performs a full outer join composing vertex out- and in-degree and - * applying a zero count for vertices having an out- or in-degree of zero. - * - * @param <K> ID type - */ - @ForwardedFieldsFirst("0") - @ForwardedFieldsSecond("0") - public static final class JoinVertexDegreeWithVertexDegree<K> - implements JoinFunction<Vertex<K, LongValue>, Vertex<K, LongValue>, Vertex<K, Tuple2<LongValue, LongValue>>> { - private LongValue zero = new LongValue(0); - - private Tuple2<LongValue, LongValue> degrees = new Tuple2<>(); - - private Vertex<K, Tuple2<LongValue, LongValue>> output = new Vertex<>(null, degrees); - - @Override - public Vertex<K, Tuple2<LongValue, LongValue>> join(Vertex<K, LongValue> left, Vertex<K, LongValue> right) - throws Exception { - if (left == null) { - output.f0 = right.f0; - degrees.f0 = zero; - degrees.f1 = right.f1; - } else { - output.f0 = left.f0; - degrees.f0 = left.f1; - degrees.f1 = (right == null) ? zero : right.f1; - } - - return output; - } - } - // -------------------------------------------------------------------------------------------- // Edge functions // -------------------------------------------------------------------------------------------- @@ -185,17 +123,18 @@ public class DegreeAnnotationFunctions { * * @param <K> ID type * @param <EV> edge value type + * @param <D> degree type */ @ForwardedFieldsFirst("0; 1; 2->2.0") @ForwardedFieldsSecond("0; 1->2.1") - public static final class JoinEdgeWithVertexDegree<K, EV> - implements JoinFunction<Edge<K, EV>, Vertex<K, LongValue>, Edge<K, Tuple2<EV, LongValue>>> { - private Tuple2<EV, LongValue> valueAndDegree = new Tuple2<>(); + public static class JoinEdgeWithVertexDegree<K, EV, D> + implements JoinFunction<Edge<K, EV>, Vertex<K, D>, Edge<K, Tuple2<EV, D>>> { + private Tuple2<EV, D> valueAndDegree = new Tuple2<>(); - private Edge<K, Tuple2<EV, LongValue>> output = new Edge<>(null, null, valueAndDegree); + private Edge<K, Tuple2<EV, D>> output = new Edge<>(null, null, valueAndDegree); @Override - public Edge<K, Tuple2<EV, LongValue>> join(Edge<K, EV> edge, Vertex<K, LongValue> vertex) throws Exception { + public Edge<K, Tuple2<EV, D>> join(Edge<K, EV> edge, Vertex<K, D> vertex) throws Exception { output.f0 = edge.f0; output.f1 = edge.f1; valueAndDegree.f0 = edge.f2; @@ -210,19 +149,20 @@ public class DegreeAnnotationFunctions { * * @param <K> ID type * @param <EV> edge value type + * @param <D> degree type */ @ForwardedFieldsFirst("0; 1; 2.0; 2.1") @ForwardedFieldsSecond("0; 1->2.2") - public static final class JoinEdgeDegreeWithVertexDegree<K, EV> - implements JoinFunction<Edge<K, Tuple2<EV, LongValue>>, Vertex<K, LongValue>, Edge<K, Tuple3<EV, LongValue, LongValue>>> { - private Tuple3<EV, LongValue, LongValue> valueAndDegrees = new Tuple3<>(); + public static class JoinEdgeDegreeWithVertexDegree<K, EV, D> + implements JoinFunction<Edge<K, Tuple2<EV, D>>, Vertex<K, D>, Edge<K, Tuple3<EV, D, D>>> { + private Tuple3<EV, D, D> valueAndDegrees = new Tuple3<>(); - private Edge<K, Tuple3<EV, LongValue, LongValue>> output = new Edge<>(null, null, valueAndDegrees); + private Edge<K, Tuple3<EV, D, D>> output = new Edge<>(null, null, valueAndDegrees); @Override - public Edge<K, Tuple3<EV, LongValue, LongValue>> join(Edge<K, Tuple2<EV, LongValue>> edge, Vertex<K, LongValue> vertex) + public Edge<K, Tuple3<EV, D, D>> join(Edge<K, Tuple2<EV, D>> edge, Vertex<K, D> vertex) throws Exception { - Tuple2<EV, LongValue> valueAndDegree = edge.f2; + Tuple2<EV, D> valueAndDegree = edge.f2; output.f0 = edge.f0; output.f1 = edge.f1; http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java new file mode 100644 index 0000000..5aa5ca4 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.degree.annotate.directed; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree; +import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; + +/** + * Annotates edges of a directed graph with the degree, out-degree, and + * in-degree of both the source and target vertices. + * + * @param <K> ID type + * @param <VV> vertex value type + * @param <EV> edge value type + */ +public class EdgeDegreesPair<K, VV, EV> +implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>>> { + + // Optional configuration + private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + + /** + * Override the operator parallelism. + * + * @param parallelism operator parallelism + * @return this + */ + public EdgeDegreesPair<K, VV, EV> setParallelism(int parallelism) { + this.parallelism = parallelism; + + return this; + } + + @Override + public DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> run(Graph<K, VV, EV> input) + throws Exception { + // s, t, d(s) + DataSet<Edge<K, Tuple2<EV, Degrees>>> edgeSourceDegrees = input + .run(new EdgeSourceDegrees<K, VV, EV>() + .setParallelism(parallelism)); + + // t, d(t) + DataSet<Vertex<K, Degrees>> vertexDegrees = input + .run(new VertexDegrees<K, VV, EV>() + .setParallelism(parallelism)); + + // s, t, (d(s), d(t)) + return edgeSourceDegrees + .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND) + .where(1) + .equalTo(0) + .with(new JoinEdgeDegreeWithVertexDegree<K, EV, Degrees>()) + .setParallelism(parallelism) + .name("Edge target degree"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java new file mode 100644 index 0000000..85ba0ed --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.degree.annotate.directed; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree; +import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; + +/** + * Annotates edges of a directed graph with the degree, out-degree, and + * in-degree of the source vertex. + * + * @param <K> ID type + * @param <VV> vertex value type + * @param <EV> edge value type + */ +public class EdgeSourceDegrees<K, VV, EV> +implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> { + + // Optional configuration + private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + + /** + * Override the operator parallelism. + * + * @param parallelism operator parallelism + * @return this + */ + public EdgeSourceDegrees<K, VV, EV> setParallelism(int parallelism) { + this.parallelism = parallelism; + + return this; + } + + @Override + public DataSet<Edge<K, Tuple2<EV, Degrees>>> run(Graph<K, VV, EV> input) + throws Exception { + // s, d(s) + DataSet<Vertex<K, Degrees>> vertexDegrees = input + .run(new VertexDegrees<K, VV, EV>() + .setParallelism(parallelism)); + + // s, t, d(s) + return input.getEdges() + .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND) + .where(0) + .equalTo(0) + .with(new JoinEdgeWithVertexDegree<K, EV, Degrees>()) + .setParallelism(parallelism) + .name("Edge source degrees"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java new file mode 100644 index 0000000..6d72e44 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.degree.annotate.directed; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree; +import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; + +/** + * Annotates edges of a directed graph with the degree, out-degree, and + * in-degree of the target vertex. + * + * @param <K> ID type + * @param <VV> vertex value type + * @param <EV> edge value type + */ +public class EdgeTargetDegrees<K, VV, EV> +implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> { + + // Optional configuration + private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + + /** + * Override the operator parallelism. + * + * @param parallelism operator parallelism + * @return this + */ + public EdgeTargetDegrees<K, VV, EV> setParallelism(int parallelism) { + this.parallelism = parallelism; + + return this; + } + + @Override + public DataSet<Edge<K, Tuple2<EV, Degrees>>> run(Graph<K, VV, EV> input) + throws Exception { + // t, d(t) + DataSet<Vertex<K, Degrees>> vertexDegrees = input + .run(new VertexDegrees<K, VV, EV>() + .setParallelism(parallelism)); + + // s, t, d(t) + return input.getEdges() + .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND) + .where(1) + .equalTo(0) + .with(new JoinEdgeWithVertexDegree<K, EV, Degrees>()) + .setParallelism(parallelism) + .name("Edge target degrees"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePair.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePair.java deleted file mode 100644 index 3e2cae0..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePair.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.asm.degree.annotate.directed; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexDegreeWithVertexDegree; -import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegrees; -import org.apache.flink.types.LongValue; - -/** - * Annotates vertices of a directed graph with both the out-degree and in-degree. - * - * @param <K> ID type - * @param <VV> vertex value type - * @param <EV> edge value type - */ -public class VertexDegreePair<K, VV, EV> -implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Tuple2<LongValue, LongValue>>>> { - - // Optional configuration - private boolean includeZeroDegreeVertices = false; - - private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN; - - /** - * By default only the edge set is processed for the computation of degree. - * When this flag is set an additional join is performed against the vertex - * set in order to output vertices with out- and in-degree of zero. - * - * @param includeZeroDegreeVertices whether to output vertices with out- - * and in-degree of zero - * @return this - */ - public VertexDegreePair<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) { - this.includeZeroDegreeVertices = includeZeroDegreeVertices; - - return this; - } - - /** - * Override the operator parallelism. - * - * @param parallelism operator parallelism - * @return this - */ - public VertexDegreePair<K, VV, EV> setParallelism(int parallelism) { - this.parallelism = parallelism; - - return this; - } - - @Override - public DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> run(Graph<K, VV, EV> input) - throws Exception { - // s, deg(s) - DataSet<Vertex<K, LongValue>> outDegree = input - .run(new VertexOutDegree<K, VV, EV>() - .setIncludeZeroDegreeVertices(false)); - - // t, deg(t) - DataSet<Vertex<K, LongValue>> inDegree = input - .run(new VertexInDegree<K, VV, EV>() - .setIncludeZeroDegreeVertices(false)); - - DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> degree = outDegree - .fullOuterJoin(inDegree) - .where(0) - .equalTo(0) - .with(new JoinVertexDegreeWithVertexDegree<K>()) - .setParallelism(parallelism) - .name("Join out- and in-degree"); - - if (includeZeroDegreeVertices) { - degree = input - .getVertices() - .leftOuterJoin(degree) - .where(0) - .equalTo(0) - .with(new JoinVertexWithVertexDegrees<K, VV>()) - .setParallelism(parallelism) - .name("Join zero degree vertices"); - } - - return degree; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java new file mode 100644 index 0000000..aab0eb6 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.degree.annotate.directed; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeOrder; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; +import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.types.ByteValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.Collector; + +/** + * Annotates vertices of a directed graph with the degree, out-, and in-degree. + * + * @param <K> graph label type + * @param <VV> vertex value type + * @param <EV> edge value type + */ +public class VertexDegrees<K, VV, EV> +implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Degrees>>> { + + // Optional configuration + private boolean includeZeroDegreeVertices = false; + + private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + + /** + * By default only the edge set is processed for the computation of degree. + * When this flag is set an additional join is performed against the vertex + * set in order to output vertices with an in-degree of zero. + * + * @param includeZeroDegreeVertices whether to output vertices with an + * in-degree of zero + * @return this + */ + public VertexDegrees<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) { + this.includeZeroDegreeVertices = includeZeroDegreeVertices; + + return this; + } + + /** + * Override the operator parallelism. + * + * @param parallelism operator parallelism + * @return this + */ + public VertexDegrees<K, VV, EV> setParallelism(int parallelism) { + this.parallelism = parallelism; + + return this; + } + + @Override + public DataSet<Vertex<K, Degrees>> run(Graph<K, VV, EV> input) + throws Exception { + // s, t, bitmask + DataSet<Tuple3<K, K, ByteValue>> edgesWithOrder = input.getEdges() + .flatMap(new EmitAndFlipEdge<K, EV>()) + .setParallelism(parallelism) + .name("Emit and flip edge") + .groupBy(0, 1) + .reduce(new ReduceBitmask<K>()) + .setParallelism(parallelism) + .name("Reduce bitmask"); + + // s, d(s) + DataSet<Vertex<K, Degrees>> vertexDegrees = edgesWithOrder + .groupBy(0) + .sortGroup(1, Order.ASCENDING) + .reduceGroup(new DegreeCount<K>()) + .setParallelism(parallelism) + .name("Degree count"); + + if (includeZeroDegreeVertices) { + vertexDegrees = input.getVertices() + .leftOuterJoin(vertexDegrees) + .where(0) + .equalTo(0) + .with(new JoinVertexWithVertexDegrees<K, VV>()) + .setParallelism(parallelism) + .name("Join zero degree vertices"); + } + + return vertexDegrees; + } + + /** + * Emit each vertex both forward and reversed with the associated bitmask. + * + * @param <T> ID type + * @param <TV> vertex value type + */ + private static class EmitAndFlipEdge<T, TV> + implements FlatMapFunction<Edge<T, TV>, Tuple3<T, T, ByteValue>> { + private Tuple3<T, T, ByteValue> forward = new Tuple3<>(null, null, new ByteValue(EdgeOrder.FORWARD.getBitmask())); + + private Tuple3<T, T, ByteValue> reverse = new Tuple3<>(null, null, new ByteValue(EdgeOrder.REVERSE.getBitmask())); + + @Override + public void flatMap(Edge<T, TV> value, Collector<Tuple3<T, T, ByteValue>> out) + throws Exception { + forward.f0 = value.f0; + forward.f1 = value.f1; + out.collect(forward); + + reverse.f0 = value.f1; + reverse.f1 = value.f0; + out.collect(reverse); + } + } + + /** + * Combine mutual edges. + * + * @param <T> ID type + */ + private static class ReduceBitmask<T> + implements ReduceFunction<Tuple3<T, T, ByteValue>> { + @Override + public Tuple3<T, T, ByteValue> reduce(Tuple3<T, T, ByteValue> left, Tuple3<T, T, ByteValue> right) + throws Exception { + left.f2.setValue((byte)(left.f2.getValue() | right.f2.getValue())); + return left; + } + } + + /** + * Sum vertex degree by counting over mutual, out-, and in-edges. + * + * @param <T> ID type + */ + private static class DegreeCount<T> + implements GroupReduceFunction<Tuple3<T, T, ByteValue>, Vertex<T, Degrees>> { + private Vertex<T, Degrees> output = new Vertex<>(null, new Degrees()); + + @Override + public void reduce(Iterable<Tuple3<T, T, ByteValue>> values, Collector<Vertex<T, Degrees>> out) + throws Exception { + long degree = 0; + long outDegree = 0; + long inDegree = 0; + + for (Tuple3<T, T, ByteValue> edge : values) { + output.f0 = edge.f0; + + byte bitmask = edge.f2.getValue(); + + degree++; + + if (bitmask == EdgeOrder.FORWARD.getBitmask()) { + outDegree++; + } else if (bitmask == EdgeOrder.REVERSE.getBitmask()) { + inDegree++; + } else { + outDegree++; + inDegree++; + } + } + + output.f1.getDegree().setValue(degree); + output.f1.getOutDegree().setValue(outDegree); + output.f1.getInDegree().setValue(inDegree); + + out.collect(output); + } + } + + /** + * Performs a left outer join to apply a zero count for vertices with + * out- and in-degree of zero. + * + * @param <T> ID type + * @param <TV> vertex value type + */ + @ForwardedFieldsFirst("0") + @ForwardedFieldsSecond("0") + private static class JoinVertexWithVertexDegrees<T, TV> + implements JoinFunction<Vertex<T, TV>, Vertex<T, Degrees>, Vertex<T, Degrees>> { + private Vertex<T, Degrees> output = new Vertex<>(null, new Degrees()); + + @Override + public Vertex<T, Degrees> join(Vertex<T, TV> vertex, Vertex<T, Degrees> vertexDegree) + throws Exception { + if (vertexDegree == null) { + output.f0 = vertex.f0; + return output; + } else { + return vertexDegree; + } + } + } + + /** + * Wraps the vertex degree, out-degree, and in-degree. + */ + public static class Degrees + extends Tuple3<LongValue, LongValue, LongValue> { + private static final int HASH_SEED = 0x3a12fc31; + + private Murmur3_32 hasher = new Murmur3_32(HASH_SEED); + + public Degrees() { + this(new LongValue(), new LongValue(), new LongValue()); + } + + public Degrees(LongValue value0, LongValue value1, LongValue value2) { + super(value0, value1, value2); + } + + public LongValue getDegree() { + return f0; + } + + public LongValue getOutDegree() { + return f1; + } + + public LongValue getInDegree() { + return f2; + } + + @Override + public int hashCode() { + return hasher.reset() + .hash(f0.getValue()) + .hash(f1.getValue()) + .hash(f2.getValue()) + .hash(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java index 5825628..00acedb 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java @@ -39,7 +39,7 @@ public class VertexInDegree<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> { // Optional configuration - private boolean includeZeroDegreeVertices = true; + private boolean includeZeroDegreeVertices = false; private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN; @@ -80,7 +80,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> { .setParallelism(parallelism) .name("Map edge to target ID"); - // t, deg(t) + // t, d(t) DataSet<Vertex<K, LongValue>> targetDegree = targetIds .groupBy(0) .reduce(new DegreeCount<K>()) @@ -88,8 +88,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> { .name("Degree count"); if (includeZeroDegreeVertices) { - targetDegree = input - .getVertices() + targetDegree = input.getVertices() .leftOuterJoin(targetDegree) .where(0) .equalTo(0) http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java index 2676104..bcca19d 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java @@ -39,7 +39,7 @@ public class VertexOutDegree<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> { // Optional configuration - private boolean includeZeroDegreeVertices = true; + private boolean includeZeroDegreeVertices = false; private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN; @@ -80,7 +80,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> { .setParallelism(parallelism) .name("Map edge to source ID"); - // s, deg(s) + // s, d(s) DataSet<Vertex<K, LongValue>> sourceDegree = sourceIds .groupBy(0) .reduce(new DegreeCount<K>()) @@ -88,8 +88,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> { .name("Degree count"); if (includeZeroDegreeVertices) { - sourceDegree = input - .getVertices() + sourceDegree = input.getVertices() .leftOuterJoin(sourceDegree) .where(0) .equalTo(0) http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/package-info.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/package-info.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/package-info.java index f25fd90..a138b1a 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/package-info.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/package-info.java @@ -24,18 +24,25 @@ * equivalent to the out-degree. * * The undirected graph algorithms are: - * {@code VertexDegree} annotates vertices as <v, deg(v)> - * {@code EdgeSourceDegree} annotates edges as <s, t, deg(s)> - * {@code EdgeTargetDegree} annotates edges as <s, t, deg(t)> - * {@code EdgeDegreePair} annotates edges as <s, t, (deg(s), deg(t))> + * {@code VertexDegree} annotates vertices as <v, deg(v)> + * {@code EdgeSourceDegree} annotates edges as <s, t, (EV, deg(s))> + * {@code EdgeTargetDegree} annotates edges as <s, t, (EV, deg(t))> + * {@code EdgeDegreePair} annotates edges as <s, t, (EV, deg(s), deg(t))> * * The directed graph algorithms are: - * {@code VertexOutDegree} annotates vertices as <v, out(v)> - * {@code VertexInDegree} annotates vertices as <v, in(v)> - * {@code VertexDegreePair} annotates vertices as <v, (out(v), in(v))> + * {@code VertexDegrees} annotates vertices as <v, (deg(v), out(v), in(v))> + * {@code VertexOutDegree} annotates vertices as <v, out(v)> + * {@code VertexInDegree} annotates vertices as <v, in(v)> + * {@code EdgeSourceDegrees} annotates edges as <s, t, (deg(s), out(s), in(s))> + * {@code EdgeTargetDegrees} annotates edges as <s, t, (deg(t), out(t), in(t))> + * {@code EdgeDegreesPair} annotates edges as <s, t, ((deg(s), out(s), in(s)), (deg(t), out(t), in(t)))> * - * A directed graph edge has four possible degrees: source out- and in-degree - * and target out- and in-degree. This gives 2^4 - 1 = 15 ways to annotate - * a directed edge. + * where: + * EV is the original edge value + * deg(x) is the number of vertex neighbors + * out(x) is the number of vertex neighbors connected by an out-edge + * in(x) is the number of vertex neighbors connected by an in-edge + * + * (out(x) + in(x)) / 2 <= deg(x) <= out(x) + in(x) */ package org.apache.flink.graph.asm.degree.annotate; http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java index 698a46a..8cc2e08 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java @@ -31,7 +31,8 @@ import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.Join import org.apache.flink.types.LongValue; /** - * Annotates edges of an undirected graph with the degree of both the source and target degree ID. + * Annotates edges of an undirected graph with the degree of both the source + * and target degree vertices. * * @param <K> ID type * @param <VV> vertex value type @@ -75,7 +76,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, LongValue, LongV @Override public DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> run(Graph<K, VV, EV> input) throws Exception { - // s, t, deg(s) + // s, t, d(s) DataSet<Edge<K, Tuple2<EV, LongValue>>> edgeSourceDegrees = input .run(new EdgeSourceDegree<K, VV, EV>() .setReduceOnTargetId(reduceOnTargetId) @@ -87,12 +88,12 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, LongValue, LongV .setReduceOnTargetId(reduceOnTargetId) .setParallelism(parallelism)); - // s, t, (deg(s), deg(t)) + // s, t, (d(s), d(t)) return edgeSourceDegrees .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND) .where(1) .equalTo(0) - .with(new JoinEdgeDegreeWithVertexDegree<K,EV>()) + .with(new JoinEdgeDegreeWithVertexDegree<K, EV, LongValue>()) .setParallelism(parallelism) .name("Edge target degree"); } http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java index 6f64bc1..b9b59c5 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java @@ -30,7 +30,7 @@ import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.Join import org.apache.flink.types.LongValue; /** - * Annotates edges of an undirected graph with degree of the source ID. + * Annotates edges of an undirected graph with degree of the source vertex. * * @param <K> ID type * @param <VV> vertex value type @@ -85,7 +85,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> { .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND) .where(0) .equalTo(0) - .with(new JoinEdgeWithVertexDegree<K, EV>()) + .with(new JoinEdgeWithVertexDegree<K, EV, LongValue>()) .setParallelism(parallelism) .name("Edge source degree"); } http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java index e9505bc..eabfee7 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java @@ -30,7 +30,7 @@ import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.Join import org.apache.flink.types.LongValue; /** - * Annotates edges of an undirected graph with degree of the target ID. + * Annotates edges of an undirected graph with degree of the target vertex. * * @param <K> ID type * @param <VV> vertex value type @@ -85,7 +85,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> { .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND) .where(1) .equalTo(0) - .with(new JoinEdgeWithVertexDegree<K, EV>()) + .with(new JoinEdgeWithVertexDegree<K, EV, LongValue>()) .setParallelism(parallelism) .name("Edge target degree"); } http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java index 518afaa..61a5e82 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java @@ -111,8 +111,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> { .name("Degree count"); if (includeZeroDegreeVertices) { - degree = input - .getVertices() + degree = input.getVertices() .leftOuterJoin(degree) .where(0) .equalTo(0) http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java new file mode 100644 index 0000000..3fcb9dd --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.degree.annotate.directed; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.Utils.ChecksumHashCode; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class EdgeDegreesPairTest +extends AsmTestBase { + + @Test + public void testWithSimpleGraph() + throws Exception { + String expectedResult = + "(0,1,((null),(2,2,0),(3,2,1)))\n" + + "(0,2,((null),(2,2,0),(3,1,2)))\n" + + "(1,2,((null),(3,2,1),(3,1,2)))\n" + + "(1,3,((null),(3,2,1),(4,2,2)))\n" + + "(2,3,((null),(3,1,2),(4,2,2)))\n" + + "(3,4,((null),(4,2,2),(1,0,1)))\n" + + "(3,5,((null),(4,2,2),(1,0,1)))"; + + DataSet<Edge<IntValue, Tuple3<NullValue, Degrees, Degrees>>> degrees = directedSimpleGraph + .run(new EdgeDegreesPair<IntValue, NullValue, NullValue>()); + + TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult); + } + + @Test + public void testWithRMatGraph() + throws Exception { + ChecksumHashCode degreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph + .run(new EdgeDegreesPair<LongValue, NullValue, NullValue>())); + + assertEquals(16384, degreeChecksum.getCount()); + assertEquals(0x00001f68dfabd17cL, degreeChecksum.getChecksum()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java new file mode 100644 index 0000000..2b22eea --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.degree.annotate.directed; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.Utils.ChecksumHashCode; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class EdgeSourceDegreesTest +extends AsmTestBase { + + @Test + public void testWithSimpleGraph() + throws Exception { + String expectedResult = + "(0,1,((null),(2,2,0)))\n" + + "(0,2,((null),(2,2,0)))\n" + + "(1,2,((null),(3,2,1)))\n" + + "(1,3,((null),(3,2,1)))\n" + + "(2,3,((null),(3,1,2)))\n" + + "(3,4,((null),(4,2,2)))\n" + + "(3,5,((null),(4,2,2)))"; + + DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> degrees = directedSimpleGraph + .run(new EdgeSourceDegrees<IntValue, NullValue, NullValue>()); + + TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult); + } + + @Test + public void testWithRMatGraph() + throws Exception { + ChecksumHashCode sourceDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph + .run(new EdgeSourceDegrees<LongValue, NullValue, NullValue>())); + + assertEquals(16384, sourceDegreeChecksum.getCount()); + assertEquals(0x00001ec53bd55136L, sourceDegreeChecksum.getChecksum()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java new file mode 100644 index 0000000..6840dc5 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.degree.annotate.directed; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.Utils.ChecksumHashCode; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class EdgeTargetDegreesTest +extends AsmTestBase { + + @Test + public void testWithSimpleGraph() + throws Exception { + String expectedResult = + "(0,1,((null),(3,2,1)))\n" + + "(0,2,((null),(3,1,2)))\n" + + "(1,2,((null),(3,1,2)))\n" + + "(1,3,((null),(4,2,2)))\n" + + "(2,3,((null),(4,2,2)))\n" + + "(3,4,((null),(1,0,1)))\n" + + "(3,5,((null),(1,0,1)))"; + + DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> degrees = directedSimpleGraph + .run(new EdgeTargetDegrees<IntValue, NullValue, NullValue>()); + + TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult); + } + + @Test + public void testWithRMatGraph() + throws Exception { + ChecksumHashCode targetDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph + .run(new EdgeTargetDegrees<LongValue, NullValue, NullValue>())); + + assertEquals(16384, targetDegreeChecksum.getCount()); + assertEquals(0x00001f2867ba8b4fL, targetDegreeChecksum.getChecksum()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePairTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePairTest.java deleted file mode 100644 index 9400532..0000000 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePairTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.asm.degree.annotate.directed; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.Utils.ChecksumHashCode; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.utils.DataSetUtils; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.asm.AsmTestBase; -import org.apache.flink.test.util.TestBaseUtils; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.NullValue; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -public class VertexDegreePairTest -extends AsmTestBase { - - @Test - public void testWithSimpleGraph() - throws Exception { - DataSet<Vertex<IntValue, Tuple2<LongValue, LongValue>>> vertexDegrees = directedSimpleGraph - .run(new VertexDegreePair<IntValue, NullValue, NullValue>()); - - String expectedResult = - "(0,(2,0))\n" + - "(1,(2,1))\n" + - "(2,(1,2))\n" + - "(3,(2,2))\n" + - "(4,(0,1))\n" + - "(5,(0,1))"; - - TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult); - } - - @Test - public void testWithEmptyGraph() - throws Exception { - DataSet<Vertex<LongValue, Tuple2<LongValue, LongValue>>> vertexDegrees; - - vertexDegrees = emptyGraph - .run(new VertexDegreePair<LongValue, NullValue, NullValue>() - .setIncludeZeroDegreeVertices(false)); - - assertEquals(0, vertexDegrees.collect().size()); - - vertexDegrees = emptyGraph - .run(new VertexDegreePair<LongValue, NullValue, NullValue>() - .setIncludeZeroDegreeVertices(true)); - - String expectedResult = - "(0,(0,0))\n" + - "(1,(0,0))\n" + - "(2,(0,0))"; - - TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult); - } - - @Test - public void testWithRMatGraph() - throws Exception { - ChecksumHashCode degreePairChecksum = DataSetUtils.checksumHashCode(directedRMatGraph - .run(new VertexDegreePair<LongValue, NullValue, NullValue>())); - - assertEquals(902, degreePairChecksum.getCount()); - assertEquals(0x0000000000fc025aL, degreePairChecksum.getChecksum()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java new file mode 100644 index 0000000..a0697a2 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.degree.annotate.directed; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.Utils.ChecksumHashCode; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class VertexDegreesTest +extends AsmTestBase { + + @Test + public void testWithSimpleDirectedGraph() + throws Exception { + DataSet<Vertex<IntValue, Degrees>> vertexDegrees = directedSimpleGraph + .run(new VertexDegrees<IntValue, NullValue, NullValue>()); + + String expectedResult = + "(0,(2,2,0))\n" + + "(1,(3,2,1))\n" + + "(2,(3,1,2))\n" + + "(3,(4,2,2))\n" + + "(4,(1,0,1))\n" + + "(5,(1,0,1))"; + + TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult); + } + + @Test + public void testWithSimpleUndirectedGraph() + throws Exception { + DataSet<Vertex<IntValue, Degrees>> vertexDegrees = undirectedSimpleGraph + .run(new VertexDegrees<IntValue, NullValue, NullValue>()); + + String expectedResult = + "(0,(2,2,2))\n" + + "(1,(3,3,3))\n" + + "(2,(3,3,3))\n" + + "(3,(4,4,4))\n" + + "(4,(1,1,1))\n" + + "(5,(1,1,1))"; + + TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult); + } + + @Test + public void testWithEmptyGraph() + throws Exception { + DataSet<Vertex<LongValue, Degrees>> vertexDegrees; + + vertexDegrees = emptyGraph + .run(new VertexDegrees<LongValue, NullValue, NullValue>() + .setIncludeZeroDegreeVertices(false)); + + assertEquals(0, vertexDegrees.collect().size()); + + vertexDegrees = emptyGraph + .run(new VertexDegrees<LongValue, NullValue, NullValue>() + .setIncludeZeroDegreeVertices(true)); + + String expectedResult = + "(0,(0,0,0))\n" + + "(1,(0,0,0))\n" + + "(2,(0,0,0))"; + + TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult); + } + + @Test + public void testWithRMatGraph() + throws Exception { + ChecksumHashCode degreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph + .run(new VertexDegrees<LongValue, NullValue, NullValue>())); + + assertEquals(902, degreeChecksum.getCount()); + assertEquals(0x0000015384f40cb6L, degreeChecksum.getChecksum()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java index 577e675..0fa0fe5 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java @@ -20,7 +20,6 @@ package org.apache.flink.graph.asm.degree.annotate.directed; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.Utils.ChecksumHashCode; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.AsmTestBase; @@ -39,7 +38,8 @@ extends AsmTestBase { public void testWithSimpleGraph() throws Exception { DataSet<Vertex<IntValue, LongValue>> vertexDegrees = directedSimpleGraph - .run(new VertexInDegree<IntValue, NullValue, NullValue>()); + .run(new VertexInDegree<IntValue, NullValue, NullValue>() + .setIncludeZeroDegreeVertices(true)); String expectedResult = "(0,0)\n" + @@ -79,7 +79,8 @@ extends AsmTestBase { public void testWithRMatGraph() throws Exception { ChecksumHashCode inDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph - .run(new VertexInDegree<LongValue, NullValue, NullValue>())); + .run(new VertexInDegree<LongValue, NullValue, NullValue>() + .setIncludeZeroDegreeVertices(true))); assertEquals(902, inDegreeChecksum.getCount()); assertEquals(0x0000000000e1e99cL, inDegreeChecksum.getChecksum()); http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java index b5e9ce8..f7f3d48 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java @@ -38,7 +38,8 @@ extends AsmTestBase { public void testWithSimpleGraph() throws Exception { DataSet<Vertex<IntValue, LongValue>> vertexDegrees = directedSimpleGraph - .run(new VertexOutDegree<IntValue, NullValue, NullValue>()); + .run(new VertexOutDegree<IntValue, NullValue, NullValue>() + .setIncludeZeroDegreeVertices(true)); String expectedResult = "(0,2)\n" + @@ -78,7 +79,8 @@ extends AsmTestBase { public void testWithRMatGraph() throws Exception { ChecksumHashCode outDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph - .run(new VertexOutDegree<LongValue, NullValue, NullValue>())); + .run(new VertexOutDegree<LongValue, NullValue, NullValue>() + .setIncludeZeroDegreeVertices(true))); assertEquals(902, outDegreeChecksum.getCount()); assertEquals(0x0000000000e1e99cL, outDegreeChecksum.getChecksum());
