[FLINK-7273] [gelly] Gelly tests with empty graphs There exist some tests with empty graphs but the `EmptyGraph` in `AsmTestBase` contained vertices but no edges. Add a new `EmptyGraph` without vertices and test both empty graphs for each algorithm.
EmptyGraph now generates the proper TypeInformation (for Edge<> not Tuple3) which had prevented adding edges due to a union incompatibility. GraphGeneratorUtils#vertexSet now uses a hash-combine for distinct. `PageRank` optionally includes zero-degree vertices in the results (at a performance cost). This closes #4405 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9437a0ff Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9437a0ff Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9437a0ff Branch: refs/heads/master Commit: 9437a0ffc04318f6a1a2d19c59f2ae6651b26507 Parents: 6612c0e Author: Greg Hogan <c...@greghogan.com> Authored: Wed Jul 26 06:23:52 2017 -0400 Committer: Greg Hogan <c...@greghogan.com> Committed: Thu Sep 14 23:25:25 2017 -0400 ---------------------------------------------------------------------- docs/dev/libs/gelly/library_methods.md | 1 + .../apache/flink/graph/drivers/PageRank.java | 6 +- .../flink/graph/drivers/input/GridGraph.java | 1 - .../flink/graph/generator/CirculantGraph.java | 2 +- .../flink/graph/generator/CompleteGraph.java | 2 +- .../flink/graph/generator/CycleGraph.java | 2 +- .../apache/flink/graph/generator/EchoGraph.java | 4 +- .../flink/graph/generator/EmptyGraph.java | 14 ++- .../graph/generator/GraphGeneratorUtils.java | 46 ++++++--- .../apache/flink/graph/generator/GridGraph.java | 2 +- .../flink/graph/generator/HypercubeGraph.java | 4 +- .../apache/flink/graph/generator/PathGraph.java | 2 +- .../graph/generator/SingletonEdgeGraph.java | 6 +- .../apache/flink/graph/generator/StarGraph.java | 8 +- .../directed/AverageClusteringCoefficient.java | 5 + .../directed/GlobalClusteringCoefficient.java | 5 + .../clustering/directed/TriadicCensus.java | 5 + .../AverageClusteringCoefficient.java | 5 + .../undirected/GlobalClusteringCoefficient.java | 5 + .../clustering/undirected/TriadicCensus.java | 5 + .../flink/graph/library/linkanalysis/HITS.java | 12 ++- .../graph/library/linkanalysis/PageRank.java | 50 ++++++++- .../library/metric/directed/EdgeMetrics.java | 5 + .../library/metric/directed/VertexMetrics.java | 5 + .../library/metric/undirected/EdgeMetrics.java | 5 + .../metric/undirected/VertexMetrics.java | 5 + .../graph/library/similarity/AdamicAdar.java | 4 +- .../org/apache/flink/graph/asm/AsmTestBase.java | 25 ++++- .../graph/asm/dataset/ChecksumHashCodeTest.java | 19 +++- .../flink/graph/asm/dataset/CollectTest.java | 19 +++- .../flink/graph/asm/dataset/CountTest.java | 18 +++- .../annotate/directed/EdgeDegreesPairTest.java | 25 +++-- .../directed/EdgeSourceDegreesTest.java | 25 +++-- .../directed/EdgeTargetDegreesTest.java | 25 +++-- .../annotate/directed/VertexDegreesTest.java | 40 ++++--- .../annotate/directed/VertexInDegreeTest.java | 52 +++++++--- .../annotate/directed/VertexOutDegreeTest.java | 52 +++++++--- .../annotate/undirected/EdgeDegreePairTest.java | 45 ++++++-- .../undirected/EdgeSourceDegreeTest.java | 45 ++++++-- .../undirected/EdgeTargetDegreeTest.java | 45 ++++++-- .../annotate/undirected/VertexDegreeTest.java | 45 +++++--- .../filter/undirected/MaximumDegreeTest.java | 28 +++-- .../graph/asm/simple/directed/SimplifyTest.java | 3 +- .../asm/simple/undirected/SimplifyTest.java | 6 +- .../graph/asm/translate/TranslateTest.java | 9 +- .../graph/generator/CirculantGraphTest.java | 12 +-- .../graph/generator/CompleteGraphTest.java | 12 +-- .../flink/graph/generator/CycleGraphTest.java | 12 +-- .../flink/graph/generator/EchoGraphTest.java | 24 ++--- .../flink/graph/generator/EmptyGraphTest.java | 12 +-- .../flink/graph/generator/GridGraphTest.java | 12 +-- .../graph/generator/HypercubeGraphTest.java | 12 +-- .../flink/graph/generator/PathGraphTest.java | 12 +-- .../flink/graph/generator/RMatGraphTest.java | 9 +- .../graph/generator/SingletonEdgeGraphTest.java | 12 +-- .../flink/graph/generator/StarGraphTest.java | 12 +-- .../apache/flink/graph/generator/TestUtils.java | 9 +- .../AverageClusteringCoefficientTest.java | 74 ++++++------- .../GlobalClusteringCoefficientTest.java | 73 ++++++------- .../LocalClusteringCoefficientTest.java | 60 +++++++---- .../clustering/directed/TriadicCensusTest.java | 34 +++--- .../directed/TriangleListingTest.java | 28 +++-- .../AverageClusteringCoefficientTest.java | 74 ++++++------- .../GlobalClusteringCoefficientTest.java | 73 ++++++------- .../LocalClusteringCoefficientTest.java | 60 +++++++---- .../undirected/TriadicCensusTest.java | 28 +++-- .../undirected/TriangleListingTest.java | 31 ++++-- .../graph/library/linkanalysis/HITSTest.java | 103 +++++++++++-------- .../library/linkanalysis/PageRankTest.java | 97 ++++++++++------- .../library/metric/ChecksumHashCodeTest.java | 27 ++++- .../metric/directed/EdgeMetricsTest.java | 70 ++++++------- .../metric/directed/VertexMetricsTest.java | 95 ++++++++--------- .../metric/undirected/EdgeMetricsTest.java | 70 ++++++------- .../metric/undirected/VertexMetricsTest.java | 95 ++++++++--------- .../library/similarity/AdamicAdarTest.java | 76 +++++++++++--- .../library/similarity/JaccardIndexTest.java | 89 ++++++++++++---- 76 files changed, 1320 insertions(+), 829 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/docs/dev/libs/gelly/library_methods.md ---------------------------------------------------------------------- diff --git a/docs/dev/libs/gelly/library_methods.md b/docs/dev/libs/gelly/library_methods.md index de53aaf..93a2c5d 100644 --- a/docs/dev/libs/gelly/library_methods.md +++ b/docs/dev/libs/gelly/library_methods.md @@ -330,6 +330,7 @@ The algorithm takes a simple directed graph as input and outputs a `DataSet` of hub score, and authority score. Termination is configured by the number of iterations and/or a convergence threshold on the iteration sum of the change in scores over all vertices. +* `setIncludeZeroDegreeVertices`: whether to include zero-degree vertices in the iterative computation * `setParallelism`: override the operator parallelism ### PageRank http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java index 299aeed..b9997e4 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java @@ -20,13 +20,14 @@ package org.apache.flink.graph.drivers; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; +import org.apache.flink.graph.drivers.parameter.BooleanParameter; import org.apache.flink.graph.drivers.parameter.DoubleParameter; import org.apache.flink.graph.drivers.parameter.IterationConvergence; import org.apache.commons.lang3.text.StrBuilder; /** - * @see org.apache.flink.graph.library.linkanalysis.PageRank + * Driver for {@link org.apache.flink.graph.library.linkanalysis.PageRank}. */ public class PageRank<K, VV, EV> extends DriverBase<K, VV, EV> { @@ -40,6 +41,8 @@ extends DriverBase<K, VV, EV> { private IterationConvergence iterationConvergence = new IterationConvergence(this, DEFAULT_ITERATIONS); + private BooleanParameter includeZeroDegreeVertices = new BooleanParameter(this, "__include_zero_degree_vertices"); + @Override public String getShortDescription() { return "score vertices by the number and quality of incoming links"; @@ -63,6 +66,7 @@ extends DriverBase<K, VV, EV> { dampingFactor.getValue(), iterationConvergence.getValue().iterations, iterationConvergence.getValue().convergenceThreshold) + .setIncludeZeroDegreeVertices(includeZeroDegreeVertices.getValue()) .setParallelism(parallelism.getValue().intValue())); } } http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java index 1421b1d..fdb144f 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java @@ -76,7 +76,6 @@ extends GeneratedGraph { @Override protected long vertexCount() { - // in Java 8 use Math.multiplyExact(long, long) BigInteger vertexCount = BigInteger.ONE; for (Dimension dimension : dimensions) { vertexCount = vertexCount.multiply(BigInteger.valueOf(dimension.size)); http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java index 1dc9e66..0cb3edd 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java @@ -51,7 +51,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> { private final ExecutionEnvironment env; // Required configuration - private long vertexCount; + private final long vertexCount; private List<OffsetRange> offsetRanges = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java index bf7dedf..46d8e67 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java @@ -36,7 +36,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> { private final ExecutionEnvironment env; // Required configuration - private long vertexCount; + private final long vertexCount; /** * An undirected {@link Graph} connecting every distinct pair of vertices. http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java index 5b61fa8..5dad4c8 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java @@ -36,7 +36,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> { private final ExecutionEnvironment env; // Required configuration - private long vertexCount; + private final long vertexCount; /** * An undirected {@link Graph} with {@code n} vertices where each vertex http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java index d834df1..4baeb25 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java @@ -46,9 +46,9 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> { private final ExecutionEnvironment env; // Required configuration - private long vertexCount; + private final long vertexCount; - private long vertexDegree; + private final long vertexDegree; /** * An undirected {@link Graph} whose vertices have the same degree. http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java index 466e2d3..30d0d2f 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java @@ -18,12 +18,11 @@ package org.apache.flink.graph.generator; +import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.ValueTypeInfo; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; @@ -39,13 +38,13 @@ import java.util.Collections; public class EmptyGraph extends GraphGeneratorBase<LongValue, NullValue, NullValue> { - public static final int MINIMUM_VERTEX_COUNT = 1; + public static final int MINIMUM_VERTEX_COUNT = 0; // Required to create the DataSource private final ExecutionEnvironment env; // Required configuration - private long vertexCount; + private final long vertexCount; /** * The {@link Graph} containing no edges. @@ -63,15 +62,14 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> { @Override public Graph<LongValue, NullValue, NullValue> generate() { + Preconditions.checkState(vertexCount >= 0); + // Vertices DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount); // Edges - TypeInformation<Edge<LongValue, NullValue>> typeInformation = new TupleTypeInfo<>( - ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.NULL_VALUE_TYPE_INFO); - DataSource<Edge<LongValue, NullValue>> edges = env - .fromCollection(Collections.<Edge<LongValue, NullValue>>emptyList(), typeInformation) + .fromCollection(Collections.<Edge<LongValue, NullValue>>emptyList(), TypeInformation.of(new TypeHint<Edge<LongValue, NullValue>>(){})) .setParallelism(parallelism) .name("Empty edge set"); http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java index d5a70f3..61a27c4 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java @@ -20,6 +20,9 @@ package org.apache.flink.graph.generator; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; @@ -31,6 +34,9 @@ import org.apache.flink.types.LongValue; import org.apache.flink.types.NullValue; import org.apache.flink.util.Collector; import org.apache.flink.util.LongValueSequenceIterator; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; /** * Utilities for graph generators. @@ -45,26 +51,34 @@ public class GraphGeneratorUtils { * @param env the Flink execution environment. * @param parallelism operator parallelism * @param vertexCount number of sequential vertex labels - * @return {@link DataSet} of sequentially labeled {@link Vertex Vertices} + * @return {@link DataSet} of sequentially labeled {@link Vertex vertices} */ public static DataSet<Vertex<LongValue, NullValue>> vertexSequence(ExecutionEnvironment env, int parallelism, long vertexCount) { - LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, vertexCount - 1); - - DataSource<LongValue> vertexLabels = env - .fromParallelCollection(iterator, LongValue.class) - .setParallelism(parallelism) - .name("Vertex iterators"); - - return vertexLabels - .map(new CreateVertex()) - .setParallelism(parallelism) - .name("Vertex sequence"); + Preconditions.checkArgument(vertexCount >= 0, "Vertex count must be non-negative"); + + if (vertexCount == 0) { + return env + .fromCollection(Collections.emptyList(), TypeInformation.of(new TypeHint<Vertex<LongValue, NullValue>>(){})) + .setParallelism(parallelism) + .name("Empty vertex set"); + } else { + LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, vertexCount - 1); + + DataSource<LongValue> vertexLabels = env + .fromParallelCollection(iterator, LongValue.class) + .setParallelism(parallelism) + .name("Vertex indices"); + + return vertexLabels + .map(new CreateVertex()) + .setParallelism(parallelism) + .name("Vertex sequence"); + } } @ForwardedFields("*->f0") private static class CreateVertex implements MapFunction<LongValue, Vertex<LongValue, NullValue>> { - private Vertex<LongValue, NullValue> vertex = new Vertex<>(null, NullValue.getInstance()); @Override @@ -79,13 +93,13 @@ public class GraphGeneratorUtils { // -------------------------------------------------------------------------------------------- /** - * Generates {@link Vertex Vertices} present in the given set of {@link Edge}s. + * Generates {@link Vertex vertices} present in the given set of {@link Edge}s. * * @param edges source {@link DataSet} of {@link Edge}s * @param parallelism operator parallelism * @param <K> label type * @param <EV> edge value type - * @return {@link DataSet} of discovered {@link Vertex Vertices} + * @return {@link DataSet} of discovered {@link Vertex vertices} * * @see Graph#fromDataSet(DataSet, DataSet, ExecutionEnvironment) */ @@ -97,6 +111,7 @@ public class GraphGeneratorUtils { return vertexSet .distinct() + .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Emit vertex labels"); } @@ -106,7 +121,6 @@ public class GraphGeneratorUtils { */ private static final class EmitSrcAndTarget<K, EV> implements FlatMapFunction<Edge<K, EV>, Vertex<K, NullValue>> { - private Vertex<K, NullValue> output = new Vertex<>(null, new NullValue()); @Override http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java index cae2bc4..1d91b53 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java @@ -71,7 +71,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> { public GridGraph addDimension(long size, boolean wrapEndpoints) { Preconditions.checkArgument(size >= 2, "Dimension size must be at least 2"); - vertexCount *= size; + vertexCount = Math.multiplyExact(vertexCount, size); // prevent duplicate edges if (size == 2) { http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java index daaaf53..0dc95e7 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java @@ -36,7 +36,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> { private final ExecutionEnvironment env; // Required configuration - private long dimensions; + private final long dimensions; /** * An undirected {@code Graph} where edges form an n-dimensional hypercube. @@ -56,6 +56,8 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> { @Override public Graph<LongValue, NullValue, NullValue> generate() { + Preconditions.checkState(dimensions > 0); + GridGraph graph = new GridGraph(env); for (int i = 0; i < dimensions; i++) { http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java index e61fcd8..bae8633 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java @@ -36,7 +36,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> { private final ExecutionEnvironment env; // Required configuration - private long vertexCount; + private final long vertexCount; /** * An undirected {@link Graph} with {@code n} vertices where each vertex http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java index 159e55d..333c051 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java @@ -42,7 +42,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> { private final ExecutionEnvironment env; // Required configuration - private long vertexPairCount; + private final long vertexPairCount; /** * An undirected {@link Graph} containing one or more isolated two-paths. @@ -62,8 +62,10 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> { @Override public Graph<LongValue, NullValue, NullValue> generate() { + Preconditions.checkState(vertexPairCount > 0); + // Vertices - long vertexCount = 2 * this.vertexPairCount; + long vertexCount = 2 * vertexPairCount; DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount); http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java index 7133320..5cbb256 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java @@ -43,11 +43,11 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> { private final ExecutionEnvironment env; // Required configuration - private long vertexCount; + private final long vertexCount; /** - * An undirected {@Graph} with {@code n} vertices where the single central - * node has degree {@code n-1}, connecting to the other {@code n-1} + * An undirected {@link Graph} with {@code n} vertices where the single + * central node has degree {@code n-1}, connecting to the other {@code n-1} * vertices which have degree {@code 1}. * * @param env the Flink execution environment @@ -63,6 +63,8 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> { @Override public Graph<LongValue, NullValue, NullValue> generate() { + Preconditions.checkState(vertexCount >= 2); + // Vertices DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount); http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java index 2ff5dc2..1d3b7c6 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java @@ -150,6 +150,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> { } @Override + public String toString() { + return toPrintableString(); + } + + @Override public String toPrintableString() { return "vertex count: " + vertexCount + ", average clustering coefficient: " + averageLocalClusteringCoefficient; http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java index a6b0baa..ae15d40 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java @@ -137,6 +137,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> { } @Override + public String toString() { + return toPrintableString(); + } + + @Override public String toPrintableString() { return "triplet count: " + tripletCount + ", triangle count: " + triangleCount http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java index 93eadc5..f99df1c 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java @@ -504,6 +504,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> { } @Override + public String toString() { + return toPrintableString(); + } + + @Override public String toPrintableString() { NumberFormat nf = NumberFormat.getInstance(); http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java index c0ddb05..2885d6e 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java @@ -150,6 +150,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> { } @Override + public String toString() { + return toPrintableString(); + } + + @Override public String toPrintableString() { return "vertex count: " + vertexCount + ", average clustering coefficient: " + averageLocalClusteringCoefficient; http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java index ef212ae..1a7314e 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java @@ -136,6 +136,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> { } @Override + public String toString() { + return toPrintableString(); + } + + @Override public String toPrintableString() { return "triplet count: " + tripletCount + ", triangle count: " + triangleCount http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java index f440098..24dcade 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java @@ -198,6 +198,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> { } @Override + public String toString() { + return toPrintableString(); + } + + @Override public String toPrintableString() { NumberFormat nf = NumberFormat.getInstance(); http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java index e59240b..9ec1eea 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java @@ -48,6 +48,7 @@ import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; import java.util.Collection; +import java.util.Iterator; /** * Hyperlink-Induced Topic Search computes two interdependent scores for every @@ -387,12 +388,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { public void open(Configuration parameters) throws Exception { super.open(parameters); - Collection<DoubleValue> var; - var = getRuntimeContext().getBroadcastVariable(HUBBINESS_SUM_SQUARED); - hubbinessRootSumSquared = Math.sqrt(var.iterator().next().getValue()); + Collection<DoubleValue> hubbinessSumSquared = getRuntimeContext().getBroadcastVariable(HUBBINESS_SUM_SQUARED); + Iterator<DoubleValue> hubbinessSumSquaredIterator = hubbinessSumSquared.iterator(); + this.hubbinessRootSumSquared = hubbinessSumSquaredIterator.hasNext() ? Math.sqrt(hubbinessSumSquaredIterator.next().getValue()) : Double.NaN; - var = getRuntimeContext().getBroadcastVariable(AUTHORITY_SUM_SQUARED); - authorityRootSumSquared = Math.sqrt(var.iterator().next().getValue()); + Collection<DoubleValue> authoritySumSquared = getRuntimeContext().getBroadcastVariable(AUTHORITY_SUM_SQUARED); + Iterator<DoubleValue> authoritySumSquaredIterator = authoritySumSquared.iterator(); + authorityRootSumSquared = authoritySumSquaredIterator.hasNext() ? Math.sqrt(authoritySumSquaredIterator.next().getValue()) : Double.NaN; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java index 71c37aa..d259fac 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java @@ -84,6 +84,9 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { private double convergenceThreshold; + // Optional configuration + private boolean includeZeroDegreeVertices = false; + /** * PageRank with a fixed number of iterations. * @@ -126,6 +129,42 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { this.convergenceThreshold = convergenceThreshold; } + /** + * This PageRank implementation properly handles both source and sink + * vertices which have, respectively, only outgoing and incoming edges. + * + * <p>Setting this flag includes "zero-degree" vertices in the PageRank + * computation and result. These vertices are handled the same as other + * "source" vertices (with a consistent score of + * <code>(1 - damping factor) / number of vertices</code>) but only + * affect the scores of other vertices indirectly through the taking of + * this proportional portion of the "random jump" score. + * + * <p>The cost to include zero-degree vertices is a reduce for uniqueness + * on the vertex set followed by an outer join on the vertex degree + * DataSet. + * + * @param includeZeroDegreeVertices whether to include zero-degree vertices in the iterative computation + * @return this + */ + public PageRank<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) { + this.includeZeroDegreeVertices = includeZeroDegreeVertices; + + return this; + } + + @Override + protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) { + if (!super.canMergeConfigurationWith(other)) { + return false; + } + + PageRank rhs = (PageRank) other; + + return dampingFactor == rhs.dampingFactor && + includeZeroDegreeVertices == rhs.includeZeroDegreeVertices; + } + @Override protected void mergeConfiguration(GraphAlgorithmWrappingBase other) { super.mergeConfiguration(other); @@ -142,6 +181,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { // vertex degree DataSet<Vertex<K, Degrees>> vertexDegree = input .run(new VertexDegrees<K, VV, EV>() + .setIncludeZeroDegreeVertices(includeZeroDegreeVertices) .setParallelism(parallelism)); // vertex count @@ -158,7 +198,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { // vertices with zero in-edges DataSet<Tuple2<K, DoubleValue>> sourceVertices = vertexDegree .flatMap(new InitializeSourceVertices<>()) - .withBroadcastSet(vertexCount, VERTEX_COUNT) .setParallelism(parallelism) .name("Initialize source vertex scores"); @@ -285,7 +324,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { super.open(parameters); Collection<LongValue> vertexCount = getRuntimeContext().getBroadcastVariable(VERTEX_COUNT); - output.f1 = new DoubleValue(1.0 / vertexCount.iterator().next().getValue()); + Iterator<LongValue> vertexCountIterator = vertexCount.iterator(); + output.f1 = new DoubleValue(vertexCountIterator.hasNext() ? 1.0 / vertexCountIterator.next().getValue() : Double.NaN); } @Override @@ -376,11 +416,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { super.open(parameters); Collection<Tuple2<T, DoubleValue>> sumOfScores = getRuntimeContext().getBroadcastVariable(SUM_OF_SCORES); + Iterator<Tuple2<T, DoubleValue>> sumOfScoresIterator = sumOfScores.iterator(); // floating point precision error is also included in sumOfSinks - double sumOfSinks = 1 - sumOfScores.iterator().next().f1.getValue(); + double sumOfSinks = 1 - (sumOfScoresIterator.hasNext() ? sumOfScoresIterator.next().f1.getValue() : 0); Collection<LongValue> vertexCount = getRuntimeContext().getBroadcastVariable(VERTEX_COUNT); - this.vertexCount = vertexCount.iterator().next().getValue(); + Iterator<LongValue> vertexCountIterator = vertexCount.iterator(); + this.vertexCount = vertexCountIterator.hasNext() ? vertexCountIterator.next().getValue() : 0; this.uniformlyDistributedScore = ((1 - dampingFactor) + dampingFactor * sumOfSinks) / this.vertexCount; } http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java index 7294fd1..796667f 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java @@ -310,6 +310,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> { } @Override + public String toString() { + return toPrintableString(); + } + + @Override public String toPrintableString() { NumberFormat nf = NumberFormat.getInstance(); http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java index 97ee6fa..87c50ce 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java @@ -307,6 +307,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> { } @Override + public String toString() { + return toPrintableString(); + } + + @Override public String toPrintableString() { NumberFormat nf = NumberFormat.getInstance(); http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java index 8c520e6..3392d47 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java @@ -283,6 +283,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> { } @Override + public String toString() { + return toPrintableString(); + } + + @Override public String toPrintableString() { NumberFormat nf = NumberFormat.getInstance(); http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java index 1116149..39fec4a 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java @@ -257,6 +257,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> { } @Override + public String toString() { + return toPrintableString(); + } + + @Override public String toPrintableString() { NumberFormat nf = NumberFormat.getInstance(); http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java index 752e206..701e698 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java @@ -410,7 +410,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { @Override public void reduce(Iterable<Tuple3<T, T, FloatValue>> values, Collector<Result<T>> out) throws Exception { - float sum = 0; + double sum = 0; Tuple3<T, T, FloatValue> edge = null; for (Tuple3<T, T, FloatValue> next : values) { @@ -421,7 +421,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { if (sum >= minimumScore) { output.setVertexId0(edge.f0); output.setVertexId1(edge.f1); - output.setAdamicAdarScore(sum); + output.setAdamicAdarScore((float) sum); out.collect(output); } } http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java index 1afb5da..1b6acbc 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java @@ -24,6 +24,7 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.generator.CompleteGraph; import org.apache.flink.graph.generator.EmptyGraph; import org.apache.flink.graph.generator.RMatGraph; +import org.apache.flink.graph.generator.StarGraph; import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; @@ -56,11 +57,17 @@ public class AsmTestBase { // empty graph protected final long emptyGraphVertexCount = 3; - protected Graph<LongValue, NullValue, NullValue> emptyGraph; + protected Graph<LongValue, NullValue, NullValue> emptyGraphWithVertices; + + protected Graph<LongValue, NullValue, NullValue> emptyGraphWithoutVertices; + + // star graph + protected final long starGraphVertexCount = 29; + + protected Graph<LongValue, NullValue, NullValue> starGraph; @Before - public void setup() - throws Exception { + public void setup() throws Exception { env = ExecutionEnvironment.createCollectionsEnvironment(); env.getConfig().enableObjectReuse(); @@ -89,8 +96,16 @@ public class AsmTestBase { completeGraph = new CompleteGraph(env, completeGraphVertexCount) .generate(); - // empty graph - emptyGraph = new EmptyGraph(env, emptyGraphVertexCount) + // empty graph with vertices but no edges + emptyGraphWithVertices = new EmptyGraph(env, emptyGraphVertexCount) + .generate(); + + // empty graph with no vertices or edges + emptyGraphWithoutVertices = new EmptyGraph(env, 0) + .generate(); + + // star graph + starGraph = new StarGraph(env, starGraphVertexCount) .generate(); } http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java index 7d82b80..a31ce2e 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java @@ -18,6 +18,8 @@ package org.apache.flink.graph.asm.dataset; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; @@ -27,6 +29,7 @@ import org.junit.Before; import org.junit.Test; import java.util.Arrays; +import java.util.Collections; import java.util.List; import static org.junit.Assert.assertEquals; @@ -39,15 +42,13 @@ public class ChecksumHashCodeTest { private ExecutionEnvironment env; @Before - public void setup() - throws Exception { + public void setup() throws Exception { env = ExecutionEnvironment.createCollectionsEnvironment(); env.getConfig().enableObjectReuse(); } @Test - public void testChecksumHashCode() - throws Exception { + public void testList() throws Exception { List<Long> list = Arrays.asList(ArrayUtils.toObject( new long[]{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 })); @@ -58,4 +59,14 @@ public class ChecksumHashCodeTest { assertEquals(list.size(), checksum.getCount()); assertEquals(list.size() * (list.size() - 1) / 2, checksum.getChecksum()); } + + @Test + public void testEmptyList() throws Exception { + DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), TypeInformation.of(new TypeHint<Long>(){})); + + Checksum checksum = new ChecksumHashCode<Long>().run(dataset).execute(); + + assertEquals(0, checksum.getCount()); + assertEquals(0, checksum.getChecksum()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java index 29b454b..cfeadce 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java @@ -18,6 +18,8 @@ package org.apache.flink.graph.asm.dataset; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -26,9 +28,11 @@ import org.junit.Before; import org.junit.Test; import java.util.Arrays; +import java.util.Collections; import java.util.List; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; /** * Tests for {@link Collect}. @@ -38,15 +42,13 @@ public class CollectTest { private ExecutionEnvironment env; @Before - public void setup() - throws Exception { + public void setup() throws Exception { env = ExecutionEnvironment.createCollectionsEnvironment(); env.getConfig().enableObjectReuse(); } @Test - public void testCollect() - throws Exception { + public void testList() throws Exception { List<Long> list = Arrays.asList(ArrayUtils.toObject( new long[]{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 })); @@ -56,4 +58,13 @@ public class CollectTest { assertArrayEquals(list.toArray(), collected.toArray()); } + + @Test + public void testEmptyList() throws Exception { + DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), TypeInformation.of(new TypeHint<Long>(){})); + + List<Long> collected = new Collect<Long>().run(dataset).execute(); + + assertEquals(0, collected.size()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java index a1160ce..0167a5f 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java @@ -18,6 +18,8 @@ package org.apache.flink.graph.asm.dataset; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -26,6 +28,7 @@ import org.junit.Before; import org.junit.Test; import java.util.Arrays; +import java.util.Collections; import java.util.List; import static org.junit.Assert.assertEquals; @@ -38,15 +41,13 @@ public class CountTest { private ExecutionEnvironment env; @Before - public void setup() - throws Exception { + public void setup() throws Exception { env = ExecutionEnvironment.createCollectionsEnvironment(); env.getConfig().enableObjectReuse(); } @Test - public void testCount() - throws Exception { + public void testList() throws Exception { List<Long> list = Arrays.asList(ArrayUtils.toObject( new long[]{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 })); @@ -56,4 +57,13 @@ public class CountTest { assertEquals(list.size(), count); } + + @Test + public void testEmptyList() throws Exception { + DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), TypeInformation.of(new TypeHint<Long>(){})); + + long count = new Count<Long>().run(dataset).execute(); + + assertEquals(0, count); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java index 63bf133..08ba4aa 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java @@ -37,12 +37,10 @@ import static org.junit.Assert.assertEquals; /** * Tests for {@link EdgeDegreesPair}. */ -public class EdgeDegreesPairTest -extends AsmTestBase { +public class EdgeDegreesPairTest extends AsmTestBase { @Test - public void testWithSimpleGraph() - throws Exception { + public void testWithSimpleGraph() throws Exception { String expectedResult = "(0,1,((null),(2,2,0),(3,0,3)))\n" + "(0,2,((null),(2,2,0),(3,2,1)))\n" + @@ -59,8 +57,23 @@ extends AsmTestBase { } @Test - public void testWithRMatGraph() - throws Exception { + public void testWithEmptyGraphWithVertices() throws Exception { + DataSet<Edge<LongValue, Tuple3<NullValue, Degrees, Degrees>>> degreesPair = emptyGraphWithVertices + .run(new EdgeDegreesPair<>()); + + assertEquals(0, degreesPair.collect().size()); + } + + @Test + public void testWithEmptyGraphWithoutVertices() throws Exception { + DataSet<Edge<LongValue, Tuple3<NullValue, Degrees, Degrees>>> degreesPair = emptyGraphWithoutVertices + .run(new EdgeDegreesPair<>()); + + assertEquals(0, degreesPair.collect().size()); + } + + @Test + public void testWithRMatGraph() throws Exception { DataSet<Edge<LongValue, Tuple3<NullValue, Degrees, Degrees>>> degreesPair = directedRMatGraph(10, 16) .run(new EdgeDegreesPair<>()); http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java index 967cfb2..884e2d5 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java @@ -37,12 +37,10 @@ import static org.junit.Assert.assertEquals; /** * Tests for {@link EdgeSourceDegrees}. */ -public class EdgeSourceDegreesTest -extends AsmTestBase { +public class EdgeSourceDegreesTest extends AsmTestBase { @Test - public void testWithSimpleGraph() - throws Exception { + public void testWithSimpleGraph() throws Exception { String expectedResult = "(0,1,((null),(2,2,0)))\n" + "(0,2,((null),(2,2,0)))\n" + @@ -59,8 +57,23 @@ extends AsmTestBase { } @Test - public void testWithRMatGraph() - throws Exception { + public void testWithEmptyGraphWithVertices() throws Exception { + DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> sourceDegrees = emptyGraphWithVertices + .run(new EdgeSourceDegrees<>()); + + assertEquals(0, sourceDegrees.collect().size()); + } + + @Test + public void testWithEmptyGraphWithoutVertices() throws Exception { + DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> sourceDegrees = emptyGraphWithoutVertices + .run(new EdgeSourceDegrees<>()); + + assertEquals(0, sourceDegrees.collect().size()); + } + + @Test + public void testWithRMatGraph() throws Exception { DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> sourceDegrees = directedRMatGraph(10, 16) .run(new EdgeSourceDegrees<>()); http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java index abb76c4..5afd64c 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java @@ -37,12 +37,10 @@ import static org.junit.Assert.assertEquals; /** * Tests for {@link EdgeTargetDegrees}. */ -public class EdgeTargetDegreesTest -extends AsmTestBase { +public class EdgeTargetDegreesTest extends AsmTestBase { @Test - public void testWithSimpleGraph() - throws Exception { + public void testWithSimpleGraph() throws Exception { String expectedResult = "(0,1,((null),(3,0,3)))\n" + "(0,2,((null),(3,2,1)))\n" + @@ -59,8 +57,23 @@ extends AsmTestBase { } @Test - public void testWithRMatGraph() - throws Exception { + public void testWithEmptyGraphWithVertices() throws Exception { + DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> targetDegrees = emptyGraphWithVertices + .run(new EdgeTargetDegrees<>()); + + assertEquals(0, targetDegrees.collect().size()); + } + + @Test + public void testWithEmptyGraphWithoutVertices() throws Exception { + DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> targetDegrees = emptyGraphWithoutVertices + .run(new EdgeTargetDegrees<>()); + + assertEquals(0, targetDegrees.collect().size()); + } + + @Test + public void testWithRMatGraph() throws Exception { DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> targetDegrees = directedRMatGraph(10, 16) .run(new EdgeTargetDegrees<>()); http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java index 91f354f..7b89fe7 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java @@ -36,12 +36,10 @@ import static org.junit.Assert.assertEquals; /** * Tests for {@link VertexDegrees}. */ -public class VertexDegreesTest -extends AsmTestBase { +public class VertexDegreesTest extends AsmTestBase { @Test - public void testWithSimpleDirectedGraph() - throws Exception { + public void testWithDirectedSimpleGraph() throws Exception { DataSet<Vertex<IntValue, Degrees>> degrees = directedSimpleGraph .run(new VertexDegrees<>()); @@ -57,8 +55,7 @@ extends AsmTestBase { } @Test - public void testWithSimpleUndirectedGraph() - throws Exception { + public void testWithUndirectedSimpleGraph() throws Exception { DataSet<Vertex<IntValue, Degrees>> degrees = undirectedSimpleGraph .run(new VertexDegrees<>()); @@ -74,17 +71,14 @@ extends AsmTestBase { } @Test - public void testWithEmptyGraph() - throws Exception { - DataSet<Vertex<LongValue, Degrees>> degrees; - - degrees = emptyGraph + public void testWithEmptyGraphWithVertices() throws Exception { + DataSet<Vertex<LongValue, Degrees>> degreesWithoutZeroDegreeVertices = emptyGraphWithVertices .run(new VertexDegrees<LongValue, NullValue, NullValue>() .setIncludeZeroDegreeVertices(false)); - assertEquals(0, degrees.collect().size()); + assertEquals(0, degreesWithoutZeroDegreeVertices.collect().size()); - degrees = emptyGraph + DataSet<Vertex<LongValue, Degrees>> degreesWithZeroDegreeVertices = emptyGraphWithVertices .run(new VertexDegrees<LongValue, NullValue, NullValue>() .setIncludeZeroDegreeVertices(true)); @@ -93,12 +87,26 @@ extends AsmTestBase { "(1,(0,0,0))\n" + "(2,(0,0,0))"; - TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult); + TestBaseUtils.compareResultAsText(degreesWithZeroDegreeVertices.collect(), expectedResult); + } + + @Test + public void testWithEmptyGraphWithoutVertices() throws Exception { + DataSet<Vertex<LongValue, Degrees>> degreesWithoutZeroDegreeVertices = emptyGraphWithoutVertices + .run(new VertexDegrees<LongValue, NullValue, NullValue>() + .setIncludeZeroDegreeVertices(false)); + + assertEquals(0, degreesWithoutZeroDegreeVertices.collect().size()); + + DataSet<Vertex<LongValue, Degrees>> degreesWithZeroDegreeVertices = emptyGraphWithoutVertices + .run(new VertexDegrees<LongValue, NullValue, NullValue>() + .setIncludeZeroDegreeVertices(true)); + + assertEquals(0, degreesWithZeroDegreeVertices.collect().size()); } @Test - public void testWithRMatGraph() - throws Exception { + public void testWithRMatGraph() throws Exception { DataSet<Vertex<LongValue, Degrees>> degrees = directedRMatGraph(10, 16) .run(new VertexDegrees<>()); http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java index f671cab..7f6c9e3 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java @@ -35,12 +35,10 @@ import static org.junit.Assert.assertEquals; /** * Tests for {@link VertexInDegree}. */ -public class VertexInDegreeTest -extends AsmTestBase { +public class VertexInDegreeTest extends AsmTestBase { @Test - public void testWithSimpleGraph() - throws Exception { + public void testWithDirectedSimpleGraph() throws Exception { DataSet<Vertex<IntValue, LongValue>> inDegree = directedSimpleGraph .run(new VertexInDegree<IntValue, NullValue, NullValue>() .setIncludeZeroDegreeVertices(true)); @@ -57,17 +55,31 @@ extends AsmTestBase { } @Test - public void testWithEmptyGraph() - throws Exception { - DataSet<Vertex<LongValue, LongValue>> inDegree; + public void testWithUndirectedSimpleGraph() throws Exception { + DataSet<Vertex<IntValue, LongValue>> inDegree = undirectedSimpleGraph + .run(new VertexInDegree<IntValue, NullValue, NullValue>() + .setIncludeZeroDegreeVertices(true)); + + String expectedResult = + "(0,2)\n" + + "(1,3)\n" + + "(2,3)\n" + + "(3,4)\n" + + "(4,1)\n" + + "(5,1)"; - inDegree = emptyGraph + TestBaseUtils.compareResultAsText(inDegree.collect(), expectedResult); + } + + @Test + public void testWithEmptyGraphWithVertices() throws Exception { + DataSet<Vertex<LongValue, LongValue>> inDegreeWithoutZeroDegreeVertices = emptyGraphWithVertices .run(new VertexInDegree<LongValue, NullValue, NullValue>() .setIncludeZeroDegreeVertices(false)); - assertEquals(0, inDegree.collect().size()); + assertEquals(0, inDegreeWithoutZeroDegreeVertices.collect().size()); - inDegree = emptyGraph + DataSet<Vertex<LongValue, LongValue>> inDegreeWithZeroDegreeVertices = emptyGraphWithVertices .run(new VertexInDegree<LongValue, NullValue, NullValue>() .setIncludeZeroDegreeVertices(true)); @@ -76,12 +88,26 @@ extends AsmTestBase { "(1,0)\n" + "(2,0)"; - TestBaseUtils.compareResultAsText(inDegree.collect(), expectedResult); + TestBaseUtils.compareResultAsText(inDegreeWithZeroDegreeVertices.collect(), expectedResult); + } + + @Test + public void testWithEmptyGraphWithoutVertices() throws Exception { + DataSet<Vertex<LongValue, LongValue>> inDegreeWithoutZeroDegreeVertices = emptyGraphWithoutVertices + .run(new VertexInDegree<LongValue, NullValue, NullValue>() + .setIncludeZeroDegreeVertices(false)); + + assertEquals(0, inDegreeWithoutZeroDegreeVertices.collect().size()); + + DataSet<Vertex<LongValue, LongValue>> inDegreeWithZeroDegreeVertices = emptyGraphWithoutVertices + .run(new VertexInDegree<LongValue, NullValue, NullValue>() + .setIncludeZeroDegreeVertices(true)); + + assertEquals(0, inDegreeWithZeroDegreeVertices.collect().size()); } @Test - public void testWithRMatGraph() - throws Exception { + public void testWithRMatGraph() throws Exception { DataSet<Vertex<LongValue, LongValue>> inDegree = directedRMatGraph(10, 16) .run(new VertexInDegree<LongValue, NullValue, NullValue>() .setIncludeZeroDegreeVertices(true)); http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java index 1517f23..7031d8f 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java @@ -35,12 +35,10 @@ import static org.junit.Assert.assertEquals; /** * Tests for {@link VertexOutDegree}. */ -public class VertexOutDegreeTest -extends AsmTestBase { +public class VertexOutDegreeTest extends AsmTestBase { @Test - public void testWithSimpleGraph() - throws Exception { + public void testWithDirectedSimpleGraph() throws Exception { DataSet<Vertex<IntValue, LongValue>> outDegree = directedSimpleGraph .run(new VertexOutDegree<IntValue, NullValue, NullValue>() .setIncludeZeroDegreeVertices(true)); @@ -57,17 +55,31 @@ extends AsmTestBase { } @Test - public void testWithEmptyGraph() - throws Exception { - DataSet<Vertex<LongValue, LongValue>> outDegree; + public void testWithUndirectedSimpleGraph() throws Exception { + DataSet<Vertex<IntValue, LongValue>> outDegree = undirectedSimpleGraph + .run(new VertexOutDegree<IntValue, NullValue, NullValue>() + .setIncludeZeroDegreeVertices(true)); + + String expectedResult = + "(0,2)\n" + + "(1,3)\n" + + "(2,3)\n" + + "(3,4)\n" + + "(4,1)\n" + + "(5,1)"; - outDegree = emptyGraph + TestBaseUtils.compareResultAsText(outDegree.collect(), expectedResult); + } + + @Test + public void testWithEmptyGraphWithVertices() throws Exception { + DataSet<Vertex<LongValue, LongValue>> outDegreeWithoutZeroDegreeVertices = emptyGraphWithVertices .run(new VertexOutDegree<LongValue, NullValue, NullValue>() .setIncludeZeroDegreeVertices(false)); - assertEquals(0, outDegree.collect().size()); + assertEquals(0, outDegreeWithoutZeroDegreeVertices.collect().size()); - outDegree = emptyGraph + DataSet<Vertex<LongValue, LongValue>> outDegreeWithZeroDegreeVertices = emptyGraphWithVertices .run(new VertexOutDegree<LongValue, NullValue, NullValue>() .setIncludeZeroDegreeVertices(true)); @@ -76,12 +88,26 @@ extends AsmTestBase { "(1,0)\n" + "(2,0)"; - TestBaseUtils.compareResultAsText(outDegree.collect(), expectedResult); + TestBaseUtils.compareResultAsText(outDegreeWithZeroDegreeVertices.collect(), expectedResult); + } + + @Test + public void testWithEmptyGraphWithoutVertices() throws Exception { + DataSet<Vertex<LongValue, LongValue>> outDegreeWithoutZeroDegreeVertices = emptyGraphWithoutVertices + .run(new VertexOutDegree<LongValue, NullValue, NullValue>() + .setIncludeZeroDegreeVertices(false)); + + assertEquals(0, outDegreeWithoutZeroDegreeVertices.collect().size()); + + DataSet<Vertex<LongValue, LongValue>> outDegreeWithZeroDegreeVertices = emptyGraphWithoutVertices + .run(new VertexOutDegree<LongValue, NullValue, NullValue>() + .setIncludeZeroDegreeVertices(true)); + + assertEquals(0, outDegreeWithZeroDegreeVertices.collect().size()); } @Test - public void testWithRMatGraph() - throws Exception { + public void testWithRMatGraph() throws Exception { DataSet<Vertex<LongValue, LongValue>> outDegree = directedRMatGraph(10, 16) .run(new VertexOutDegree<LongValue, NullValue, NullValue>() .setIncludeZeroDegreeVertices(true)); http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java index 1cae2e7..95a89c5 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java @@ -36,12 +36,10 @@ import static org.junit.Assert.assertEquals; /** * Tests for {@link EdgeDegreePair}. */ -public class EdgeDegreePairTest -extends AsmTestBase { +public class EdgeDegreePairTest extends AsmTestBase { @Test - public void testWithSimpleGraph() - throws Exception { + public void testWithSimpleGraph() throws Exception { String expectedResult = "(0,1,((null),2,3))\n" + "(0,2,((null),2,3))\n" + @@ -59,7 +57,8 @@ extends AsmTestBase { "(5,3,((null),1,4))"; DataSet<Edge<IntValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnSourceId = undirectedSimpleGraph - .run(new EdgeDegreePair<>()); + .run(new EdgeDegreePair<IntValue, NullValue, NullValue>() + .setReduceOnTargetId(false)); TestBaseUtils.compareResultAsText(degreePairOnSourceId.collect(), expectedResult); @@ -71,10 +70,40 @@ extends AsmTestBase { } @Test - public void testWithRMatGraph() - throws Exception { + public void testWithEmptyGraphWithVertices() throws Exception { + DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnSourceId = emptyGraphWithVertices + .run(new EdgeDegreePair<LongValue, NullValue, NullValue>() + .setReduceOnTargetId(false)); + + assertEquals(0, degreePairOnSourceId.collect().size()); + + DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnTargetId = emptyGraphWithVertices + .run(new EdgeDegreePair<LongValue, NullValue, NullValue>() + .setReduceOnTargetId(true)); + + assertEquals(0, degreePairOnTargetId.collect().size()); + } + + @Test + public void testWithEmptyGraphWithoutVertices() throws Exception { + DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnSourceId = emptyGraphWithoutVertices + .run(new EdgeDegreePair<LongValue, NullValue, NullValue>() + .setReduceOnTargetId(false)); + + assertEquals(0, degreePairOnSourceId.collect().size()); + + DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnTargetId = emptyGraphWithoutVertices + .run(new EdgeDegreePair<LongValue, NullValue, NullValue>() + .setReduceOnTargetId(true)); + + assertEquals(0, degreePairOnTargetId.collect().size()); + } + + @Test + public void testWithRMatGraph() throws Exception { DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnSourceId = undirectedRMatGraph(10, 16) - .run(new EdgeDegreePair<>()); + .run(new EdgeDegreePair<LongValue, NullValue, NullValue>() + .setReduceOnTargetId(false)); Checksum checksumOnSourceId = new ChecksumHashCode<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>>() .run(degreePairOnSourceId) http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java index 2d8b2e3..3802b4f 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java @@ -36,12 +36,10 @@ import static org.junit.Assert.assertEquals; /** * Tests for {@link EdgeSourceDegree}. */ -public class EdgeSourceDegreeTest -extends AsmTestBase { +public class EdgeSourceDegreeTest extends AsmTestBase { @Test - public void testWithSimpleGraph() - throws Exception { + public void testWithSimpleGraph() throws Exception { String expectedResult = "(0,1,((null),2))\n" + "(0,2,((null),2))\n" + @@ -59,7 +57,8 @@ extends AsmTestBase { "(5,3,((null),1))"; DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnSourceId = undirectedSimpleGraph - .run(new EdgeSourceDegree<>()); + .run(new EdgeSourceDegree<IntValue, NullValue, NullValue>() + .setReduceOnTargetId(false)); TestBaseUtils.compareResultAsText(sourceDegreeOnSourceId.collect(), expectedResult); @@ -71,10 +70,40 @@ extends AsmTestBase { } @Test - public void testWithRMatGraph() - throws Exception { + public void testWithEmptyGraphWithVertices() throws Exception { + DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnSourceId = emptyGraphWithVertices + .run(new EdgeSourceDegree<LongValue, NullValue, NullValue>() + .setReduceOnTargetId(false)); + + assertEquals(0, sourceDegreeOnSourceId.collect().size()); + + DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnTargetId = emptyGraphWithVertices + .run(new EdgeSourceDegree<LongValue, NullValue, NullValue>() + .setReduceOnTargetId(true)); + + assertEquals(0, sourceDegreeOnTargetId.collect().size()); + } + + @Test + public void testWithEmptyGraphWithoutVertices() throws Exception { + DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnSourceId = emptyGraphWithoutVertices + .run(new EdgeSourceDegree<LongValue, NullValue, NullValue>() + .setReduceOnTargetId(false)); + + assertEquals(0, sourceDegreeOnSourceId.collect().size()); + + DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnTargetId = emptyGraphWithoutVertices + .run(new EdgeSourceDegree<LongValue, NullValue, NullValue>() + .setReduceOnTargetId(true)); + + assertEquals(0, sourceDegreeOnTargetId.collect().size()); + } + + @Test + public void testWithRMatGraph() throws Exception { DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnSourceId = undirectedRMatGraph(10, 16) - .run(new EdgeSourceDegree<>()); + .run(new EdgeSourceDegree<LongValue, NullValue, NullValue>() + .setReduceOnTargetId(false)); Checksum checksumOnSourceId = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, LongValue>>>() .run(sourceDegreeOnSourceId) http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java index a7c88a1..51f880b 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java @@ -36,12 +36,10 @@ import static org.junit.Assert.assertEquals; /** * Tests for {@link EdgeTargetDegree}. */ -public class EdgeTargetDegreeTest -extends AsmTestBase { +public class EdgeTargetDegreeTest extends AsmTestBase { @Test - public void testWithSimpleGraph() - throws Exception { + public void testWithSimpleGraph() throws Exception { String expectedResult = "(0,1,((null),3))\n" + "(0,2,((null),3))\n" + @@ -59,7 +57,8 @@ extends AsmTestBase { "(5,3,((null),4))"; DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> targetDegreeOnTargetId = undirectedSimpleGraph - .run(new EdgeTargetDegree<>()); + .run(new EdgeTargetDegree<IntValue, NullValue, NullValue>() + .setReduceOnSourceId(false)); TestBaseUtils.compareResultAsText(targetDegreeOnTargetId.collect(), expectedResult); @@ -71,10 +70,40 @@ extends AsmTestBase { } @Test - public void testWithRMatGraph() - throws Exception { + public void testWithEmptyGraphWithVertices() throws Exception { + DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> targetDegreeOnTargetId = emptyGraphWithVertices + .run(new EdgeTargetDegree<LongValue, NullValue, NullValue>() + .setReduceOnSourceId(false)); + + assertEquals(0, targetDegreeOnTargetId.collect().size()); + + DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> targetDegreeOnSourceId = emptyGraphWithVertices + .run(new EdgeTargetDegree<LongValue, NullValue, NullValue>() + .setReduceOnSourceId(true)); + + assertEquals(0, targetDegreeOnSourceId.collect().size()); + } + + @Test + public void testWithEmptyGraphWithoutVertices() throws Exception { + DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> targetDegreeOnTargetId = emptyGraphWithoutVertices + .run(new EdgeTargetDegree<LongValue, NullValue, NullValue>() + .setReduceOnSourceId(false)); + + assertEquals(0, targetDegreeOnTargetId.collect().size()); + + DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> targetDegreeOnSourceId = emptyGraphWithoutVertices + .run(new EdgeTargetDegree<LongValue, NullValue, NullValue>() + .setReduceOnSourceId(true)); + + assertEquals(0, targetDegreeOnSourceId.collect().size()); + } + + @Test + public void testWithRMatGraph() throws Exception { DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> targetDegreeOnTargetId = undirectedRMatGraph(10, 16) - .run(new EdgeSourceDegree<>()); + .run(new EdgeTargetDegree<LongValue, NullValue, NullValue>() + .setReduceOnSourceId(false)); Checksum checksumOnTargetId = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, LongValue>>>() .run(targetDegreeOnTargetId) http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java index bc76bff..49f0007 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java @@ -35,12 +35,10 @@ import static org.junit.Assert.assertEquals; /** * Tests for {@link VertexDegree}. */ -public class VertexDegreeTest -extends AsmTestBase { +public class VertexDegreeTest extends AsmTestBase { @Test - public void testWithSimpleGraph() - throws Exception { + public void testWithSimpleGraph() throws Exception { String expectedResult = "(0,2)\n" + "(1,3)\n" + @@ -50,7 +48,8 @@ extends AsmTestBase { "(5,1)"; DataSet<Vertex<IntValue, LongValue>> degreeOnSourceId = undirectedSimpleGraph - .run(new VertexDegree<>()); + .run(new VertexDegree<IntValue, NullValue, NullValue>() + .setReduceOnTargetId(false)); TestBaseUtils.compareResultAsText(degreeOnSourceId.collect(), expectedResult); @@ -62,12 +61,12 @@ extends AsmTestBase { } @Test - public void testWithCompleteGraph() - throws Exception { + public void testWithCompleteGraph() throws Exception { long expectedDegree = completeGraphVertexCount - 1; DataSet<Vertex<LongValue, LongValue>> degreeOnSourceId = completeGraph - .run(new VertexDegree<>()); + .run(new VertexDegree<LongValue, NullValue, NullValue>() + .setReduceOnTargetId(false)); for (Vertex<LongValue, LongValue> vertex : degreeOnSourceId.collect()) { assertEquals(expectedDegree, vertex.getValue().getValue()); @@ -83,17 +82,16 @@ extends AsmTestBase { } @Test - public void testWithEmptyGraph() - throws Exception { + public void testWithEmptyGraphWithVertices() throws Exception { DataSet<Vertex<LongValue, LongValue>> degree; - degree = emptyGraph + degree = emptyGraphWithVertices .run(new VertexDegree<LongValue, NullValue, NullValue>() .setIncludeZeroDegreeVertices(false)); assertEquals(0, degree.collect().size()); - degree = emptyGraph + degree = emptyGraphWithVertices .run(new VertexDegree<LongValue, NullValue, NullValue>() .setIncludeZeroDegreeVertices(true)); @@ -106,10 +104,27 @@ extends AsmTestBase { } @Test - public void testWithRMatGraph() - throws Exception { + public void testWithEmptyGraphWithoutVertices() throws Exception { + DataSet<Vertex<LongValue, LongValue>> degree; + + degree = emptyGraphWithoutVertices + .run(new VertexDegree<LongValue, NullValue, NullValue>() + .setIncludeZeroDegreeVertices(false)); + + assertEquals(0, degree.collect().size()); + + degree = emptyGraphWithoutVertices + .run(new VertexDegree<LongValue, NullValue, NullValue>() + .setIncludeZeroDegreeVertices(true)); + + assertEquals(0, degree.collect().size()); + } + + @Test + public void testWithRMatGraph() throws Exception { DataSet<Vertex<LongValue, LongValue>> degreeOnSourceId = undirectedRMatGraph(10, 16) - .run(new VertexDegree<>()); + .run(new VertexDegree<LongValue, NullValue, NullValue>() + .setReduceOnTargetId(false)); Checksum checksumOnSourceId = new ChecksumHashCode<Vertex<LongValue, LongValue>>() .run(degreeOnSourceId) http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java index 51e7712..f43be9c 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java @@ -24,6 +24,7 @@ import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; import org.apache.flink.graph.library.metric.ChecksumHashCode; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; import org.apache.flink.types.NullValue; import org.junit.Test; @@ -33,12 +34,10 @@ import static org.junit.Assert.assertEquals; /** * Tests for {@link MaximumDegree}. */ -public class MaximumDegreeTest -extends AsmTestBase { +public class MaximumDegreeTest extends AsmTestBase { @Test - public void testWithSimpleGraph() - throws Exception { + public void testWithSimpleGraph() throws Exception { Graph<IntValue, NullValue, NullValue> graph = undirectedSimpleGraph .run(new MaximumDegree<>(3)); @@ -63,8 +62,25 @@ extends AsmTestBase { } @Test - public void testWithRMatGraph() - throws Exception { + public void testWithEmptyGraphWithVertices() throws Exception { + Graph<LongValue, NullValue, NullValue> graph = emptyGraphWithVertices + .run(new MaximumDegree<>(1)); + + assertEquals(emptyGraphVertexCount, graph.getVertices().collect().size()); + assertEquals(0, graph.getEdges().collect().size()); + } + + @Test + public void testWithEmptyGraphWithoutVertices() throws Exception { + Graph<LongValue, NullValue, NullValue> graph = emptyGraphWithoutVertices + .run(new MaximumDegree<>(1)); + + assertEquals(0, graph.getVertices().collect().size()); + assertEquals(0, graph.getEdges().collect().size()); + } + + @Test + public void testWithRMatGraph() throws Exception { Checksum checksum = undirectedRMatGraph(10, 16) .run(new MaximumDegree<>(16)) .run(new ChecksumHashCode<>()) http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java index 751d030..f31c09a 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java @@ -62,8 +62,7 @@ public class SimplifyTest { } @Test - public void test() - throws Exception { + public void test() throws Exception { String expectedResult = "(0,1,(null))\n" + "(0,2,(null))\n" +