Repository: flink Updated Branches: refs/heads/master 6a6eeb9d5 -> f025c455f
[FLINK-4963] [gelly] Tabulate edge direction for directed VertexMetrics The current implementation simply counts edges. We can do one better and tabulate unidirectional (u:v but no v:u) and bidirectional edges (u:v and v:u). This is effectively the 'dyadic census'. This commit also makes edge metrics distinct from vertex metrics. Previously EdgeMetrics has always been a superset of VertexMetrics. This closes #2725 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f025c455 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f025c455 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f025c455 Branch: refs/heads/master Commit: f025c455f2a2e4bc5bd5bb39b47c3dc1d1e3b117 Parents: 6a6eeb9 Author: Greg Hogan <[email protected]> Authored: Fri Oct 28 12:28:41 2016 -0400 Committer: Greg Hogan <[email protected]> Committed: Tue Nov 8 09:41:41 2016 -0500 ---------------------------------------------------------------------- .../org/apache/flink/graph/AnalyticHelper.java | 79 +++++++ .../directed/AverageClusteringCoefficient.java | 46 ++-- .../AverageClusteringCoefficient.java | 46 ++-- .../library/metric/directed/EdgeMetrics.java | 219 +++---------------- .../library/metric/directed/VertexMetrics.java | 140 +++++++----- .../library/metric/undirected/EdgeMetrics.java | 184 +++------------- .../metric/undirected/VertexMetrics.java | 68 +++--- .../metric/directed/EdgeMetricsTest.java | 12 +- .../metric/directed/VertexMetricsTest.java | 13 +- .../metric/undirected/EdgeMetricsTest.java | 11 +- 10 files changed, 300 insertions(+), 518 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f025c455/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java new file mode 100644 index 0000000..b07a8c3 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.io.RichOutputFormat; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.AbstractID; + +import java.io.IOException; +import java.io.Serializable; + +/** + * A {@link GraphAnalytic} computes over a DataSet and returns the results via + * Flink accumulators. This computation is cheaply performed in a terminating + * {@link RichOutputFormat}. + * + * This class simplifies the creation of analytic helpers by providing pass-through + * methods for adding and getting accumulators. Each accumulator name is prefixed + * with a random string since Flink accumulators share a per-job global namespace. + * This class also provides empty implementations of {@link RichOutputFormat#open} + * and {@link RichOutputFormat#close}. + * + * @param <T> element type + */ +public abstract class AnalyticHelper<T> +extends RichOutputFormat<T> { + + private static final String SEPARATOR = "-"; + + private String id = new AbstractID().toString(); + + @Override + public void configure(Configuration parameters) {} + + @Override + public void open(int taskNumber, int numTasks) throws IOException {} + + /** + * Adds an accumulator by prepending the given name with a random string. + * + * @param name The name of the accumulator + * @param accumulator The accumulator + * @param <V> Type of values that are added to the accumulator + * @param <A> Type of the accumulator result as it will be reported to the client + */ + public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) { + getRuntimeContext().addAccumulator(id + SEPARATOR + name, accumulator); + } + + /** + * Gets the accumulator with the given name. Returns {@code null}, if no accumulator with + * that name was produced. + * + * @param accumulatorName The name of the accumulator + * @param <A> The generic type of the accumulator value + * @return The value of the accumulator with the given name + */ + public <A> A getAccumulator(ExecutionEnvironment env, String accumulatorName) { + return env.getLastJobExecutionResult().getAccumulatorResult(id + SEPARATOR + accumulatorName); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f025c455/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java index f589d04..c0a80d1 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java @@ -20,17 +20,14 @@ package org.apache.flink.graph.library.clustering.directed; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.DoubleCounter; import org.apache.flink.api.common.accumulators.LongCounter; -import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.api.java.DataSet; -import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.AbstractGraphAnalytic; import org.apache.flink.graph.Graph; +import org.apache.flink.graph.AnalyticHelper; import org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient.Result; import org.apache.flink.types.CopyableValue; -import org.apache.flink.util.AbstractID; import java.io.IOException; @@ -47,7 +44,11 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; public class AverageClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, Result> { - private String id = new AbstractID().toString(); + private static final String VERTEX_COUNT = "vertexCount"; + + private static final String SUM_OF_LOCAL_CLUSTERING_COEFFICIENT = "sumOfLocalClusteringCoefficient"; + + private AverageClusteringCoefficientHelper<K> averageClusteringCoefficientHelper; // Optional configuration private int littleParallelism = PARALLELISM_DEFAULT; @@ -80,8 +81,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { .run(new LocalClusteringCoefficient<K, VV, EV>() .setLittleParallelism(littleParallelism)); + averageClusteringCoefficientHelper = new AverageClusteringCoefficientHelper<>(); + localClusteringCoefficient - .output(new AverageClusteringCoefficientHelper<K>(id)) + .output(averageClusteringCoefficientHelper) .name("Average clustering coefficient"); return this; @@ -89,10 +92,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { @Override public Result getResult() { - JobExecutionResult res = env.getLastJobExecutionResult(); - - long vertexCount = res.getAccumulatorResult(id + "-0"); - double sumOfLocalClusteringCoefficient = res.getAccumulatorResult(id + "-1"); + long vertexCount = averageClusteringCoefficientHelper.getAccumulator(env, VERTEX_COUNT); + double sumOfLocalClusteringCoefficient = averageClusteringCoefficientHelper.getAccumulator(env, SUM_OF_LOCAL_CLUSTERING_COEFFICIENT); return new Result(vertexCount, sumOfLocalClusteringCoefficient); } @@ -103,28 +104,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { * @param <T> ID type */ private static class AverageClusteringCoefficientHelper<T> - extends RichOutputFormat<LocalClusteringCoefficient.Result<T>> { - private final String id; - + extends AnalyticHelper<LocalClusteringCoefficient.Result<T>> { private long vertexCount; private double sumOfLocalClusteringCoefficient; - /** - * The unique id is required because Flink's accumulator namespace is - * shared among all operators. - * - * @param id unique string used for accumulator names - */ - public AverageClusteringCoefficientHelper(String id) { - this.id = id; - } - - @Override - public void configure(Configuration parameters) {} - - @Override - public void open(int taskNumber, int numTasks) throws IOException {} - @Override public void writeRecord(LocalClusteringCoefficient.Result<T> record) throws IOException { vertexCount++; @@ -138,8 +121,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { @Override public void close() throws IOException { - getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount)); - getRuntimeContext().addAccumulator(id + "-1", new DoubleCounter(sumOfLocalClusteringCoefficient)); + addAccumulator(VERTEX_COUNT, new LongCounter(vertexCount)); + addAccumulator(SUM_OF_LOCAL_CLUSTERING_COEFFICIENT, new DoubleCounter(sumOfLocalClusteringCoefficient)); } } @@ -148,7 +131,6 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { */ public static class Result { private long vertexCount; - private double averageLocalClusteringCoefficient; /** http://git-wip-us.apache.org/repos/asf/flink/blob/f025c455/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java index 03dbc71..3d4a88e 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java @@ -20,17 +20,14 @@ package org.apache.flink.graph.library.clustering.undirected; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.DoubleCounter; import org.apache.flink.api.common.accumulators.LongCounter; -import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.api.java.DataSet; -import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.AbstractGraphAnalytic; import org.apache.flink.graph.Graph; +import org.apache.flink.graph.AnalyticHelper; import org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient.Result; import org.apache.flink.types.CopyableValue; -import org.apache.flink.util.AbstractID; import java.io.IOException; @@ -47,7 +44,11 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; public class AverageClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, Result> { - private String id = new AbstractID().toString(); + private static final String VERTEX_COUNT = "vertexCount"; + + private static final String SUM_OF_LOCAL_CLUSTERING_COEFFICIENT = "sumOfLocalClusteringCoefficient"; + + private AverageClusteringCoefficientHelper<K> averageClusteringCoefficientHelper; // Optional configuration private int littleParallelism = PARALLELISM_DEFAULT; @@ -80,8 +81,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { .run(new LocalClusteringCoefficient<K, VV, EV>() .setLittleParallelism(littleParallelism)); + averageClusteringCoefficientHelper = new AverageClusteringCoefficientHelper<>(); + localClusteringCoefficient - .output(new AverageClusteringCoefficientHelper<K>(id)) + .output(averageClusteringCoefficientHelper) .name("Average clustering coefficient"); return this; @@ -89,10 +92,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { @Override public Result getResult() { - JobExecutionResult res = env.getLastJobExecutionResult(); - - long vertexCount = res.getAccumulatorResult(id + "-0"); - double sumOfLocalClusteringCoefficient = res.getAccumulatorResult(id + "-1"); + long vertexCount = averageClusteringCoefficientHelper.getAccumulator(env, VERTEX_COUNT); + double sumOfLocalClusteringCoefficient = averageClusteringCoefficientHelper.getAccumulator(env, SUM_OF_LOCAL_CLUSTERING_COEFFICIENT); return new Result(vertexCount, sumOfLocalClusteringCoefficient); } @@ -103,28 +104,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { * @param <T> ID type */ private static class AverageClusteringCoefficientHelper<T> - extends RichOutputFormat<LocalClusteringCoefficient.Result<T>> { - private final String id; - + extends AnalyticHelper<LocalClusteringCoefficient.Result<T>> { private long vertexCount; private double sumOfLocalClusteringCoefficient; - /** - * The unique id is required because Flink's accumulator namespace is - * shared among all operators. - * - * @param id unique string used for accumulator names - */ - public AverageClusteringCoefficientHelper(String id) { - this.id = id; - } - - @Override - public void configure(Configuration parameters) {} - - @Override - public void open(int taskNumber, int numTasks) throws IOException {} - @Override public void writeRecord(LocalClusteringCoefficient.Result<T> record) throws IOException { vertexCount++; @@ -138,8 +121,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { @Override public void close() throws IOException { - getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount)); - getRuntimeContext().addAccumulator(id + "-1", new DoubleCounter(sumOfLocalClusteringCoefficient)); + addAccumulator(VERTEX_COUNT, new LongCounter(vertexCount)); + addAccumulator(SUM_OF_LOCAL_CLUSTERING_COEFFICIENT, new DoubleCounter(sumOfLocalClusteringCoefficient)); } } @@ -148,7 +131,6 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { */ public static class Result { private long vertexCount; - private double averageLocalClusteringCoefficient; /** http://git-wip-us.apache.org/repos/asf/flink/blob/f025c455/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java index 648fb76..07f4eed 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java @@ -20,13 +20,11 @@ package org.apache.flink.graph.library.metric.directed; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.accumulators.LongMaximum; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; @@ -36,12 +34,12 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.AbstractGraphAnalytic; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; +import org.apache.flink.graph.AnalyticHelper; import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; import org.apache.flink.graph.library.metric.directed.EdgeMetrics.Result; import org.apache.flink.types.CopyableValue; import org.apache.flink.types.LongValue; -import org.apache.flink.util.AbstractID; import org.apache.flink.util.Collector; import java.io.IOException; @@ -51,17 +49,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; /** * Compute the following edge metrics in a directed graph: - * - number of vertices - * - number of edges * - number of triangle triplets * - number of rectangle triplets - * - number of triplets - * - maximum degree - * - maximum out degree - * - maximum in degree * - maximum number of triangle triplets * - maximum number of rectangle triplets - * - maximum number of triplets * * @param <K> graph ID type * @param <VV> vertex value type @@ -70,7 +61,15 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; public class EdgeMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, Result> { - private String id = new AbstractID().toString(); + private static final String TRIANGLE_TRIPLET_COUNT = "triangleTripletCount"; + + private static final String RECTANGLE_TRIPLET_COUNT = "rectangleTripletCount"; + + private static final String MAXIMUM_TRIANGLE_TRIPLETS = "maximumTriangleTriplets"; + + private static final String MAXIMUM_RECTANGLE_TRIPLETS = "maximumRectangleTriplets"; + + private EdgeMetricsHelper<K> edgeMetricsHelper; private int parallelism = PARALLELISM_DEFAULT; @@ -121,8 +120,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { .setParallelism(parallelism) .name("Sum edge stats"); + edgeMetricsHelper = new EdgeMetricsHelper<>(); + edgeStats - .output(new EdgeMetricsHelper<K, EV>(id)) + .output(edgeMetricsHelper) .setParallelism(parallelism) .name("Edge metrics"); @@ -131,23 +132,14 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { @Override public Result getResult() { - JobExecutionResult res = env.getLastJobExecutionResult(); - - long vertexCount = res.getAccumulatorResult(id + "-0"); - long edgeCount = res.getAccumulatorResult(id + "-1"); - long triangleTripletCount = res.getAccumulatorResult(id + "-2"); - long rectangleTripletCount = res.getAccumulatorResult(id + "-3"); - long tripletCount = res.getAccumulatorResult(id + "-4"); - long maximumDegree = res.getAccumulatorResult(id + "-5"); - long maximumOutDegree = res.getAccumulatorResult(id + "-6"); - long maximumInDegree = res.getAccumulatorResult(id + "-7"); - long maximumTriangleTriplets = res.getAccumulatorResult(id + "-8"); - long maximumRectangleTriplets = res.getAccumulatorResult(id + "-9"); - long maximumTriplets = res.getAccumulatorResult(id + "-a"); - - return new Result(vertexCount, edgeCount, triangleTripletCount, rectangleTripletCount, tripletCount, - maximumDegree, maximumOutDegree, maximumInDegree, - maximumTriangleTriplets, maximumRectangleTriplets, maximumTriplets); + long triangleTripletCount = edgeMetricsHelper.getAccumulator(env, TRIANGLE_TRIPLET_COUNT); + long rectangleTripletCount = edgeMetricsHelper.getAccumulator(env, RECTANGLE_TRIPLET_COUNT); + long maximumTriangleTriplets = edgeMetricsHelper.getAccumulator(env, MAXIMUM_TRIANGLE_TRIPLETS); + long maximumRectangleTriplets = edgeMetricsHelper.getAccumulator(env, MAXIMUM_RECTANGLE_TRIPLETS); + + // each edge is counted twice, once from each vertex, so must be halved + return new Result(triangleTripletCount, rectangleTripletCount, + maximumTriangleTriplets, maximumRectangleTriplets); } /** @@ -238,34 +230,12 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { * * @param <T> ID type */ - private static class EdgeMetricsHelper<T extends Comparable<T>, ET> - extends RichOutputFormat<Tuple3<T, Degrees, LongValue>> { - private final String id; - - private long vertexCount; - private long edgeCount; + private static class EdgeMetricsHelper<T extends Comparable<T>> + extends AnalyticHelper<Tuple3<T, Degrees, LongValue>> { private long triangleTripletCount; private long rectangleTripletCount; - private long tripletCount; - private long maximumDegree; - private long maximumOutDegree; - private long maximumInDegree; private long maximumTriangleTriplets; private long maximumRectangleTriplets; - private long maximumTriplets; - - /** - * This helper class collects edge metrics by scanning over and - * discarding elements from the given DataSet. - * - * The unique id is required because Flink's accumulator namespace is - * among all operators. - * - * @param id unique string used for accumulator names - */ - public EdgeMetricsHelper(String id) { - this.id = id; - } @Override public void configure(Configuration parameters) {} @@ -277,42 +247,26 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { public void writeRecord(Tuple3<T, Degrees, LongValue> record) throws IOException { Degrees degrees = record.f1; long degree = degrees.getDegree().getValue(); - long outDegree = degrees.getOutDegree().getValue(); - long inDegree = degrees.getInDegree().getValue(); long lowDegree = record.f2.getValue(); long highDegree = degree - lowDegree; long triangleTriplets = lowDegree * (lowDegree - 1) / 2; long rectangleTriplets = triangleTriplets + lowDegree * highDegree; - long triplets = degree * (degree - 1) / 2; - vertexCount++; - edgeCount += outDegree; triangleTripletCount += triangleTriplets; rectangleTripletCount += rectangleTriplets; - tripletCount += triplets; - maximumDegree = Math.max(maximumDegree, degree); - maximumOutDegree = Math.max(maximumOutDegree, outDegree); - maximumInDegree = Math.max(maximumInDegree, inDegree); + maximumTriangleTriplets = Math.max(maximumTriangleTriplets, triangleTriplets); maximumRectangleTriplets = Math.max(maximumRectangleTriplets, rectangleTriplets); - maximumTriplets = Math.max(maximumTriplets, triplets); } @Override public void close() throws IOException { - getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount)); - getRuntimeContext().addAccumulator(id + "-1", new LongCounter(edgeCount)); - getRuntimeContext().addAccumulator(id + "-2", new LongCounter(triangleTripletCount)); - getRuntimeContext().addAccumulator(id + "-3", new LongCounter(rectangleTripletCount)); - getRuntimeContext().addAccumulator(id + "-4", new LongCounter(tripletCount)); - getRuntimeContext().addAccumulator(id + "-5", new LongMaximum(maximumDegree)); - getRuntimeContext().addAccumulator(id + "-6", new LongMaximum(maximumOutDegree)); - getRuntimeContext().addAccumulator(id + "-7", new LongMaximum(maximumInDegree)); - getRuntimeContext().addAccumulator(id + "-8", new LongMaximum(maximumTriangleTriplets)); - getRuntimeContext().addAccumulator(id + "-9", new LongMaximum(maximumRectangleTriplets)); - getRuntimeContext().addAccumulator(id + "-a", new LongMaximum(maximumTriplets)); + addAccumulator(TRIANGLE_TRIPLET_COUNT, new LongCounter(triangleTripletCount)); + addAccumulator(RECTANGLE_TRIPLET_COUNT, new LongCounter(rectangleTripletCount)); + addAccumulator(MAXIMUM_TRIANGLE_TRIPLETS, new LongMaximum(maximumTriangleTriplets)); + addAccumulator(MAXIMUM_RECTANGLE_TRIPLETS, new LongMaximum(maximumRectangleTriplets)); } } @@ -320,59 +274,17 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { * Wraps edge metrics. */ public static class Result { - private long vertexCount; - private long edgeCount; private long triangleTripletCount; private long rectangleTripletCount; - private long tripletCount; - private long maximumDegree; - private long maximumOutDegree; - private long maximumInDegree; private long maximumTriangleTriplets; private long maximumRectangleTriplets; - private long maximumTriplets; - public Result(long vertexCount, long edgeCount, long triangleTripletCount, long rectangleTripletCount, long tripletCount, - long maximumDegree, long maximumOutDegree, long maximumInDegree, - long maximumTriangleTriplets, long maximumRectangleTriplets, long maximumTriplets) { - this.vertexCount = vertexCount; - this.edgeCount = edgeCount; + public Result(long triangleTripletCount, long rectangleTripletCount, + long maximumTriangleTriplets, long maximumRectangleTriplets) { this.triangleTripletCount = triangleTripletCount; this.rectangleTripletCount = rectangleTripletCount; - this.tripletCount = tripletCount; - this.maximumDegree = maximumDegree; - this.maximumOutDegree = maximumOutDegree; - this.maximumInDegree = maximumInDegree; this.maximumTriangleTriplets = maximumTriangleTriplets; this.maximumRectangleTriplets = maximumRectangleTriplets; - this.maximumTriplets = maximumTriplets; - } - - /** - * Get the number of vertices. - * - * @return number of vertices - */ - public long getNumberOfVertices() { - return vertexCount; - } - - /** - * Get the number of edges. - * - * @return number of edges - */ - public long getNumberOfEdges() { - return edgeCount; - } - - /** - * Get the average degree. - * - * @return average degree - */ - public float getAverageDegree() { - return edgeCount / (float)vertexCount; } /** @@ -394,42 +306,6 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { } /** - * Get the number of triplets. - * - * @return number of triplets - */ - public long getNumberOfTriplets() { - return tripletCount; - } - - /** - * Get the maximum degree. - * - * @return maximum degree - */ - public long getMaximumDegree() { - return maximumDegree; - } - - /** - * Get the maximum out degree. - * - * @return maximum out degree - */ - public long getMaximumOutDegree() { - return maximumOutDegree; - } - - /** - * Get the maximum in degree. - * - * @return maximum in degree - */ - public long getMaximumInDegree() { - return maximumInDegree; - } - - /** * Get the maximum triangle triplets. * * @return maximum triangle triplets @@ -447,47 +323,23 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { return maximumRectangleTriplets; } - /** - * Get the maximum triplets. - * - * @return maximum triplets - */ - public long getMaximumTriplets() { - return maximumTriplets; - } - @Override public String toString() { NumberFormat nf = NumberFormat.getInstance(); - return "vertex count: " + nf.format(vertexCount) - + "; edge count: " + nf.format(edgeCount) - + "; average degree: " + nf.format(getAverageDegree()) - + "; triangle triplet count: " + nf.format(triangleTripletCount) + return "triangle triplet count: " + nf.format(triangleTripletCount) + "; rectangle triplet count: " + nf.format(rectangleTripletCount) - + "; triplet count: " + nf.format(tripletCount) - + "; maximum degree: " + nf.format(maximumDegree) - + "; maximum out degree: " + nf.format(maximumOutDegree) - + "; maximum in degree: " + nf.format(maximumInDegree) + "; maximum triangle triplets: " + nf.format(maximumTriangleTriplets) - + "; maximum rectangle triplets: " + nf.format(maximumRectangleTriplets) - + "; maximum triplets: " + nf.format(maximumTriplets); + + "; maximum rectangle triplets: " + nf.format(maximumRectangleTriplets); } @Override public int hashCode() { return new HashCodeBuilder() - .append(vertexCount) - .append(edgeCount) .append(triangleTripletCount) .append(rectangleTripletCount) - .append(tripletCount) - .append(maximumDegree) - .append(maximumOutDegree) - .append(maximumInDegree) .append(maximumTriangleTriplets) .append(maximumRectangleTriplets) - .append(maximumTriplets) .hashCode(); } @@ -500,17 +352,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { Result rhs = (Result)obj; return new EqualsBuilder() - .append(vertexCount, rhs.vertexCount) - .append(edgeCount, rhs.edgeCount) .append(triangleTripletCount, rhs.triangleTripletCount) .append(rectangleTripletCount, rhs.rectangleTripletCount) - .append(tripletCount, rhs.tripletCount) - .append(maximumDegree, rhs.maximumDegree) - .append(maximumOutDegree, rhs.maximumOutDegree) - .append(maximumInDegree, rhs.maximumInDegree) .append(maximumTriangleTriplets, rhs.maximumTriangleTriplets) .append(maximumRectangleTriplets, rhs.maximumRectangleTriplets) - .append(maximumTriplets, rhs.maximumTriplets) .isEquals(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f025c455/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java index 909eea5..1285bb9 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java @@ -20,20 +20,17 @@ package org.apache.flink.graph.library.metric.directed; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.accumulators.LongMaximum; -import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.api.java.DataSet; -import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.AbstractGraphAnalytic; import org.apache.flink.graph.Graph; +import org.apache.flink.graph.AnalyticHelper; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; import org.apache.flink.graph.library.metric.directed.VertexMetrics.Result; import org.apache.flink.types.CopyableValue; -import org.apache.flink.util.AbstractID; import java.io.IOException; import java.text.NumberFormat; @@ -44,6 +41,9 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * Compute the following vertex metrics in a directed graph: * - number of vertices * - number of edges + * - number of unidirectional edges + * - number of bidirectional edges + * - average degree * - number of triplets * - maximum degree * - maximum out degree @@ -57,7 +57,23 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; public class VertexMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, Result> { - private String id = new AbstractID().toString(); + private static final String VERTEX_COUNT = "vertexCount"; + + private static final String UNIDIRECTIONAL_EDGE_COUNT = "unidirectionalEdgeCount"; + + private static final String BIDIRECTIONAL_EDGE_COUNT = "bidirectionalEdgeCount"; + + private static final String TRIPLET_COUNT = "tripletCount"; + + private static final String MAXIMUM_DEGREE = "maximumDegree"; + + private static final String MAXIMUM_OUT_DEGREE = "maximumOutDegree"; + + private static final String MAXIMUM_IN_DEGREE = "maximumInDegree"; + + private static final String MAXIMUM_TRIPLETS = "maximumTriplets"; + + private VertexMetricsHelper<K> vertexMetricsHelper; // Optional configuration private boolean includeZeroDegreeVertices = false; @@ -101,8 +117,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { .setIncludeZeroDegreeVertices(includeZeroDegreeVertices) .setParallelism(parallelism)); + vertexMetricsHelper = new VertexMetricsHelper<>(); + vertexDegree - .output(new VertexMetricsHelper<K>(id)) + .output(vertexMetricsHelper) .name("Vertex metrics"); return this; @@ -110,17 +128,18 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { @Override public Result getResult() { - JobExecutionResult res = env.getLastJobExecutionResult(); - - long vertexCount = res.getAccumulatorResult(id + "-0"); - long edgeCount = res.getAccumulatorResult(id + "-1"); - long tripletCount = res.getAccumulatorResult(id + "-2"); - long maximumDegree = res.getAccumulatorResult(id + "-3"); - long maximumOutDegree = res.getAccumulatorResult(id + "-4"); - long maximumInDegree = res.getAccumulatorResult(id + "-5"); - long maximumTriplets = res.getAccumulatorResult(id + "-6"); - - return new Result(vertexCount, edgeCount, tripletCount, maximumDegree, maximumOutDegree, maximumInDegree, maximumTriplets); + long vertexCount = vertexMetricsHelper.getAccumulator(env, VERTEX_COUNT); + long unidirectionalEdgeCount = vertexMetricsHelper.getAccumulator(env, UNIDIRECTIONAL_EDGE_COUNT); + long bidirectionalEdgeCount = vertexMetricsHelper.getAccumulator(env, BIDIRECTIONAL_EDGE_COUNT); + long tripletCount = vertexMetricsHelper.getAccumulator(env, TRIPLET_COUNT); + long maximumDegree = vertexMetricsHelper.getAccumulator(env, MAXIMUM_DEGREE); + long maximumOutDegree = vertexMetricsHelper.getAccumulator(env, MAXIMUM_OUT_DEGREE); + long maximumInDegree = vertexMetricsHelper.getAccumulator(env, MAXIMUM_IN_DEGREE); + long maximumTriplets = vertexMetricsHelper.getAccumulator(env, MAXIMUM_TRIPLETS); + + // each edge is counted twice, once from each vertex, so must be halved + return new Result(vertexCount, unidirectionalEdgeCount / 2, bidirectionalEdgeCount / 2, tripletCount, + maximumDegree, maximumOutDegree, maximumInDegree, maximumTriplets); } /** @@ -129,45 +148,28 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { * @param <T> ID type */ private static class VertexMetricsHelper<T> - extends RichOutputFormat<Vertex<T, Degrees>> { - private final String id; - + extends AnalyticHelper<Vertex<T, Degrees>> { private long vertexCount; - private long edgeCount; + private long unidirectionalEdgeCount; + private long bidirectionalEdgeCount; private long tripletCount; private long maximumDegree; private long maximumOutDegree; private long maximumInDegree; private long maximumTriplets; - /** - * This helper class collects vertex metrics by scanning over and - * discarding elements from the given DataSet. - * - * The unique id is required because Flink's accumulator namespace is - * shared among all operators. - * - * @param id unique string used for accumulator names - */ - public VertexMetricsHelper(String id) { - this.id = id; - } - - @Override - public void configure(Configuration parameters) {} - - @Override - public void open(int taskNumber, int numTasks) throws IOException {} - @Override public void writeRecord(Vertex<T, Degrees> record) throws IOException { long degree = record.f1.getDegree().getValue(); long outDegree = record.f1.getOutDegree().getValue(); long inDegree = record.f1.getInDegree().getValue(); + + long bidirectionalEdges = outDegree + inDegree - degree; long triplets = degree * (degree - 1) / 2; vertexCount++; - edgeCount += outDegree; + unidirectionalEdgeCount += degree - bidirectionalEdges; + bidirectionalEdgeCount += bidirectionalEdges; tripletCount += triplets; maximumDegree = Math.max(maximumDegree, degree); maximumOutDegree = Math.max(maximumOutDegree, outDegree); @@ -177,13 +179,14 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { @Override public void close() throws IOException { - getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount)); - getRuntimeContext().addAccumulator(id + "-1", new LongCounter(edgeCount)); - getRuntimeContext().addAccumulator(id + "-2", new LongCounter(tripletCount)); - getRuntimeContext().addAccumulator(id + "-3", new LongMaximum(maximumDegree)); - getRuntimeContext().addAccumulator(id + "-4", new LongMaximum(maximumOutDegree)); - getRuntimeContext().addAccumulator(id + "-5", new LongMaximum(maximumInDegree)); - getRuntimeContext().addAccumulator(id + "-6", new LongMaximum(maximumTriplets)); + addAccumulator(VERTEX_COUNT, new LongCounter(vertexCount)); + addAccumulator(UNIDIRECTIONAL_EDGE_COUNT, new LongCounter(unidirectionalEdgeCount)); + addAccumulator(BIDIRECTIONAL_EDGE_COUNT, new LongCounter(bidirectionalEdgeCount)); + addAccumulator(TRIPLET_COUNT, new LongCounter(tripletCount)); + addAccumulator(MAXIMUM_DEGREE, new LongMaximum(maximumDegree)); + addAccumulator(MAXIMUM_OUT_DEGREE, new LongMaximum(maximumOutDegree)); + addAccumulator(MAXIMUM_IN_DEGREE, new LongMaximum(maximumInDegree)); + addAccumulator(MAXIMUM_TRIPLETS, new LongMaximum(maximumTriplets)); } } @@ -192,16 +195,19 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { */ public static class Result { private long vertexCount; - private long edgeCount; + private long unidirectionalEdgeCount; + private long bidirectionalEdgeCount; private long tripletCount; private long maximumDegree; private long maximumOutDegree; private long maximumInDegree; private long maximumTriplets; - public Result(long vertexCount, long edgeCount, long tripletCount, long maximumDegree, long maximumOutDegree, long maximumInDegree, long maximumTriplets) { + public Result(long vertexCount, long unidirectionalEdgeCount, long bidirectionalEdgeCount, long tripletCount, + long maximumDegree, long maximumOutDegree, long maximumInDegree, long maximumTriplets) { this.vertexCount = vertexCount; - this.edgeCount = edgeCount; + this.unidirectionalEdgeCount = unidirectionalEdgeCount; + this.bidirectionalEdgeCount = bidirectionalEdgeCount; this.tripletCount = tripletCount; this.maximumDegree = maximumDegree; this.maximumOutDegree = maximumOutDegree; @@ -224,7 +230,25 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { * @return number of edges */ public long getNumberOfEdges() { - return edgeCount; + return unidirectionalEdgeCount + 2 * bidirectionalEdgeCount; + } + + /** + * Get the number of unidirectional edges. + * + * @return number of unidirectional edges + */ + public long getNumberOfDirectedEdges() { + return unidirectionalEdgeCount; + } + + /** + * Get the number of bidirectional edges. + * + * @return number of bidirectional edges + */ + public long getNumberOfUndirectedEdges() { + return bidirectionalEdgeCount; } /** @@ -233,7 +257,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { * @return average degree */ public float getAverageDegree() { - return edgeCount / (float)vertexCount; + return getNumberOfEdges() / (float)vertexCount; } /** @@ -286,7 +310,9 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { NumberFormat nf = NumberFormat.getInstance(); return "vertex count: " + nf.format(vertexCount) - + "; edge count: " + nf.format(edgeCount) + + "; edge count: " + nf.format(getNumberOfEdges()) + + "; unidirectional edge count: " + nf.format(unidirectionalEdgeCount) + + "; bidirectional edge count: " + nf.format(bidirectionalEdgeCount) + "; average degree: " + nf.format(getAverageDegree()) + "; triplet count: " + nf.format(tripletCount) + "; maximum degree: " + nf.format(maximumDegree) @@ -299,7 +325,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { public int hashCode() { return new HashCodeBuilder() .append(vertexCount) - .append(edgeCount) + .append(unidirectionalEdgeCount) + .append(bidirectionalEdgeCount) .append(tripletCount) .append(maximumDegree) .append(maximumOutDegree) @@ -318,7 +345,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { return new EqualsBuilder() .append(vertexCount, rhs.vertexCount) - .append(edgeCount, rhs.edgeCount) + .append(unidirectionalEdgeCount, rhs.unidirectionalEdgeCount) + .append(bidirectionalEdgeCount, rhs.bidirectionalEdgeCount) .append(tripletCount, rhs.tripletCount) .append(maximumDegree, rhs.maximumDegree) .append(maximumOutDegree, rhs.maximumOutDegree) http://git-wip-us.apache.org/repos/asf/flink/blob/f025c455/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java index 1c636ff..a4deeaf 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java @@ -20,25 +20,22 @@ package org.apache.flink.graph.library.metric.undirected; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.accumulators.LongMaximum; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.functions.FunctionAnnotation; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.AbstractGraphAnalytic; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; +import org.apache.flink.graph.AnalyticHelper; import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeDegreePair; import org.apache.flink.graph.library.metric.undirected.EdgeMetrics.Result; import org.apache.flink.types.CopyableValue; import org.apache.flink.types.LongValue; -import org.apache.flink.util.AbstractID; import java.io.IOException; import java.text.NumberFormat; @@ -47,15 +44,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; /** * Compute the following edge metrics in an undirected graph: - * - number of vertices - * - number of edges * - number of triangle triplets * - number of rectangle triplets - * - number of triplets - * - maximum degree * - maximum number of triangle triplets * - maximum number of rectangle triplets - * - maximum number of triplets * * @param <K> graph ID type * @param <VV> vertex value type @@ -64,7 +56,15 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; public class EdgeMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, Result> { - private String id = new AbstractID().toString(); + private static final String TRIANGLE_TRIPLET_COUNT = "triangleTripletCount"; + + private static final String RECTANGLE_TRIPLET_COUNT = "rectangleTripletCount"; + + private static final String MAXIMUM_TRIANGLE_TRIPLETS = "maximumTriangleTriplets"; + + private static final String MAXIMUM_RECTANGLE_TRIPLETS = "maximumRectangleTriplets"; + + private EdgeMetricsHelper<K> edgeMetricsHelper; // Optional configuration private boolean reduceOnTargetId = false; @@ -127,8 +127,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { .setParallelism(parallelism) .name("Sum edge stats"); + edgeMetricsHelper = new EdgeMetricsHelper<>(); + edgeStats - .output(new EdgeMetricsHelper<K, EV>(id)) + .output(edgeMetricsHelper) .setParallelism(parallelism) .name("Edge metrics"); @@ -137,20 +139,13 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { @Override public Result getResult() { - JobExecutionResult res = env.getLastJobExecutionResult(); - - long vertexCount = res.getAccumulatorResult(id + "-0"); - long edgeCount = res.getAccumulatorResult(id + "-1"); - long triangleTripletCount = res.getAccumulatorResult(id + "-2"); - long rectangleTripletCount = res.getAccumulatorResult(id + "-3"); - long tripletCount = res.getAccumulatorResult(id + "-4"); - long maximumDegree = res.getAccumulatorResult(id + "-5"); - long maximumTriangleTriplets = res.getAccumulatorResult(id + "-6"); - long maximumRectangleTriplets = res.getAccumulatorResult(id + "-7"); - long maximumTriplets = res.getAccumulatorResult(id + "-8"); - - return new Result(vertexCount, edgeCount / 2, triangleTripletCount, rectangleTripletCount, tripletCount, - maximumDegree, maximumTriangleTriplets, maximumRectangleTriplets, maximumTriplets); + long triangleTripletCount = edgeMetricsHelper.getAccumulator(env, TRIANGLE_TRIPLET_COUNT); + long rectangleTripletCount = edgeMetricsHelper.getAccumulator(env, RECTANGLE_TRIPLET_COUNT); + long maximumTriangleTriplets = edgeMetricsHelper.getAccumulator(env, MAXIMUM_TRIANGLE_TRIPLETS); + long maximumRectangleTriplets = edgeMetricsHelper.getAccumulator(env, MAXIMUM_RECTANGLE_TRIPLETS); + + return new Result(triangleTripletCount, rectangleTripletCount, + maximumTriangleTriplets, maximumRectangleTriplets); } /** @@ -215,71 +210,36 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { * * @param <T> ID type */ - private static class EdgeMetricsHelper<T extends Comparable<T>, ET> - extends RichOutputFormat<Tuple3<T, LongValue, LongValue>> { - private final String id; - - private long vertexCount; - private long edgeCount; + private static class EdgeMetricsHelper<T extends Comparable<T>> + extends AnalyticHelper<Tuple3<T, LongValue, LongValue>> { private long triangleTripletCount; private long rectangleTripletCount; - private long tripletCount; - private long maximumDegree; private long maximumTriangleTriplets; private long maximumRectangleTriplets; - private long maximumTriplets; - - /** - * This helper class collects edge metrics by scanning over and - * discarding elements from the given DataSet. - * - * The unique id is required because Flink's accumulator namespace is - * among all operators. - * - * @param id unique string used for accumulator names - */ - public EdgeMetricsHelper(String id) { - this.id = id; - } - - @Override - public void configure(Configuration parameters) {} - - @Override - public void open(int taskNumber, int numTasks) throws IOException {} @Override public void writeRecord(Tuple3<T, LongValue, LongValue> record) throws IOException { long degree = record.f1.getValue(); + long lowDegree = record.f2.getValue(); long highDegree = degree - lowDegree; long triangleTriplets = lowDegree * (lowDegree - 1) / 2; long rectangleTriplets = triangleTriplets + lowDegree * highDegree; - long triplets = degree * (degree - 1) / 2; - vertexCount++; - edgeCount += degree; triangleTripletCount += triangleTriplets; rectangleTripletCount += rectangleTriplets; - tripletCount += triplets; - maximumDegree = Math.max(maximumDegree, degree); + maximumTriangleTriplets = Math.max(maximumTriangleTriplets, triangleTriplets); maximumRectangleTriplets = Math.max(maximumRectangleTriplets, rectangleTriplets); - maximumTriplets = Math.max(maximumTriplets, triplets); } @Override public void close() throws IOException { - getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount)); - getRuntimeContext().addAccumulator(id + "-1", new LongCounter(edgeCount)); - getRuntimeContext().addAccumulator(id + "-2", new LongCounter(triangleTripletCount)); - getRuntimeContext().addAccumulator(id + "-3", new LongCounter(rectangleTripletCount)); - getRuntimeContext().addAccumulator(id + "-4", new LongCounter(tripletCount)); - getRuntimeContext().addAccumulator(id + "-5", new LongMaximum(maximumDegree)); - getRuntimeContext().addAccumulator(id + "-6", new LongMaximum(maximumTriangleTriplets)); - getRuntimeContext().addAccumulator(id + "-7", new LongMaximum(maximumRectangleTriplets)); - getRuntimeContext().addAccumulator(id + "-8", new LongMaximum(maximumTriplets)); + addAccumulator(TRIANGLE_TRIPLET_COUNT, new LongCounter(triangleTripletCount)); + addAccumulator(RECTANGLE_TRIPLET_COUNT, new LongCounter(rectangleTripletCount)); + addAccumulator(MAXIMUM_TRIANGLE_TRIPLETS, new LongMaximum(maximumTriangleTriplets)); + addAccumulator(MAXIMUM_RECTANGLE_TRIPLETS, new LongMaximum(maximumRectangleTriplets)); } } @@ -287,54 +247,17 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { * Wraps edge metrics. */ public static class Result { - private long vertexCount; - private long edgeCount; private long triangleTripletCount; private long rectangleTripletCount; - private long tripletCount; - private long maximumDegree; private long maximumTriangleTriplets; private long maximumRectangleTriplets; - private long maximumTriplets; - public Result(long vertexCount, long edgeCount, long triangleTripletCount, long rectangleTripletCount, long tripletCount, - long maximumDegree, long maximumTriangleTriplets, long maximumRectangleTriplets, long maximumTriplets) { - this.vertexCount = vertexCount; - this.edgeCount = edgeCount; + public Result(long triangleTripletCount, long rectangleTripletCount, + long maximumTriangleTriplets, long maximumRectangleTriplets) { this.triangleTripletCount = triangleTripletCount; this.rectangleTripletCount = rectangleTripletCount; - this.tripletCount = tripletCount; - this.maximumDegree = maximumDegree; this.maximumTriangleTriplets = maximumTriangleTriplets; this.maximumRectangleTriplets = maximumRectangleTriplets; - this.maximumTriplets = maximumTriplets; - } - - /** - * Get the number of vertices. - * - * @return number of vertices - */ - public long getNumberOfVertices() { - return vertexCount; - } - - /** - * Get the number of edges. - * - * @return number of edges - */ - public long getNumberOfEdges() { - return edgeCount; - } - - /** - * Get the average degree. - * - * @return average degree - */ - public float getAverageDegree() { - return edgeCount / (float)vertexCount; } /** @@ -356,24 +279,6 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { } /** - * Get the number of triplets. - * - * @return number of triplets - */ - public long getNumberOfTriplets() { - return tripletCount; - } - - /** - * Get the maximum degree. - * - * @return maximum degree - */ - public long getMaximumDegree() { - return maximumDegree; - } - - /** * Get the maximum triangle triplets. * * @return maximum triangle triplets @@ -391,43 +296,23 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { return maximumRectangleTriplets; } - /** - * Get the maximum triplets. - * - * @return maximum triplets - */ - public long getMaximumTriplets() { - return maximumTriplets; - } - @Override public String toString() { NumberFormat nf = NumberFormat.getInstance(); - return "vertex count: " + nf.format(vertexCount) - + "; edge count: " + nf.format(edgeCount) - + "; average degree: " + nf.format(getAverageDegree()) - + "; triangle triplet count: " + nf.format(triangleTripletCount) + return "triangle triplet count: " + nf.format(triangleTripletCount) + "; rectangle triplet count: " + nf.format(rectangleTripletCount) - + "; triplet count: " + nf.format(tripletCount) - + "; maximum degree: " + nf.format(maximumDegree) + "; maximum triangle triplets: " + nf.format(maximumTriangleTriplets) - + "; maximum rectangle triplets: " + nf.format(maximumRectangleTriplets) - + "; maximum triplets: " + nf.format(maximumTriplets); + + "; maximum rectangle triplets: " + nf.format(maximumRectangleTriplets); } @Override public int hashCode() { return new HashCodeBuilder() - .append(vertexCount) - .append(edgeCount) .append(triangleTripletCount) .append(rectangleTripletCount) - .append(tripletCount) - .append(maximumDegree) .append(maximumTriangleTriplets) .append(maximumRectangleTriplets) - .append(maximumTriplets) .hashCode(); } @@ -440,15 +325,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { Result rhs = (Result)obj; return new EqualsBuilder() - .append(vertexCount, rhs.vertexCount) - .append(edgeCount, rhs.edgeCount) .append(triangleTripletCount, rhs.triangleTripletCount) .append(rectangleTripletCount, rhs.rectangleTripletCount) - .append(tripletCount, rhs.tripletCount) - .append(maximumDegree, rhs.maximumDegree) .append(maximumTriangleTriplets, rhs.maximumTriangleTriplets) .append(maximumRectangleTriplets, rhs.maximumRectangleTriplets) - .append(maximumTriplets, rhs.maximumTriplets) .isEquals(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f025c455/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java index 8012605..ee67129 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java @@ -20,20 +20,17 @@ package org.apache.flink.graph.library.metric.undirected; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.accumulators.LongMaximum; -import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.api.java.DataSet; -import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.AbstractGraphAnalytic; import org.apache.flink.graph.Graph; +import org.apache.flink.graph.AnalyticHelper; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree; import org.apache.flink.graph.library.metric.undirected.VertexMetrics.Result; import org.apache.flink.types.CopyableValue; import org.apache.flink.types.LongValue; -import org.apache.flink.util.AbstractID; import java.io.IOException; import java.text.NumberFormat; @@ -44,6 +41,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * Compute the following vertex metrics in an undirected graph: * - number of vertices * - number of edges + * - average degree * - number of triplets * - maximum degree * - maximum number of triplets @@ -55,7 +53,17 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; public class VertexMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, Result> { - private String id = new AbstractID().toString(); + private static final String VERTEX_COUNT = "vertexCount"; + + private static final String EDGE_COUNT = "edgeCount"; + + private static final String TRIPLET_COUNT = "tripletCount"; + + private static final String MAXIMUM_DEGREE = "maximumDegree"; + + private static final String MAXIMUM_TRIPLETS = "maximumTriplets"; + + private VertexMetricsHelper<K> vertexMetricsHelper; // Optional configuration private boolean includeZeroDegreeVertices = false; @@ -117,8 +125,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { .setReduceOnTargetId(reduceOnTargetId) .setParallelism(parallelism)); + vertexMetricsHelper = new VertexMetricsHelper<>(); + vertexDegree - .output(new VertexMetricsHelper<K>(id)) + .output(vertexMetricsHelper) .name("Vertex metrics"); return this; @@ -126,14 +136,13 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { @Override public Result getResult() { - JobExecutionResult res = env.getLastJobExecutionResult(); - - long vertexCount = res.getAccumulatorResult(id + "-0"); - long edgeCount = res.getAccumulatorResult(id + "-1"); - long tripletCount = res.getAccumulatorResult(id + "-2"); - long maximumDegree = res.getAccumulatorResult(id + "-3"); - long maximumTriplets = res.getAccumulatorResult(id + "-4"); + long vertexCount = vertexMetricsHelper.getAccumulator(env, VERTEX_COUNT); + long edgeCount = vertexMetricsHelper.getAccumulator(env, EDGE_COUNT); + long tripletCount = vertexMetricsHelper.getAccumulator(env, TRIPLET_COUNT); + long maximumDegree = vertexMetricsHelper.getAccumulator(env, MAXIMUM_DEGREE); + long maximumTriplets = vertexMetricsHelper.getAccumulator(env, MAXIMUM_TRIPLETS); + // each edge is counted twice, once from each vertex, so must be halved return new Result(vertexCount, edgeCount / 2, tripletCount, maximumDegree, maximumTriplets); } @@ -143,34 +152,13 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { * @param <T> ID type */ private static class VertexMetricsHelper<T> - extends RichOutputFormat<Vertex<T, LongValue>> { - private final String id; - + extends AnalyticHelper<Vertex<T, LongValue>> { private long vertexCount; private long edgeCount; private long tripletCount; private long maximumDegree; private long maximumTriplets; - /** - * This helper class collects vertex metrics by scanning over and - * discarding elements from the given DataSet. - * - * The unique id is required because Flink's accumulator namespace is - * shared among all operators. - * - * @param id unique string used for accumulator names - */ - public VertexMetricsHelper(String id) { - this.id = id; - } - - @Override - public void configure(Configuration parameters) {} - - @Override - public void open(int taskNumber, int numTasks) throws IOException {} - @Override public void writeRecord(Vertex<T, LongValue> record) throws IOException { long degree = record.f1.getValue(); @@ -185,11 +173,11 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { @Override public void close() throws IOException { - getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount)); - getRuntimeContext().addAccumulator(id + "-1", new LongCounter(edgeCount)); - getRuntimeContext().addAccumulator(id + "-2", new LongCounter(tripletCount)); - getRuntimeContext().addAccumulator(id + "-3", new LongMaximum(maximumDegree)); - getRuntimeContext().addAccumulator(id + "-4", new LongMaximum(maximumTriplets)); + addAccumulator(VERTEX_COUNT, new LongCounter(vertexCount)); + addAccumulator(EDGE_COUNT, new LongCounter(edgeCount)); + addAccumulator(TRIPLET_COUNT, new LongCounter(tripletCount)); + addAccumulator(MAXIMUM_DEGREE, new LongMaximum(maximumDegree)); + addAccumulator(MAXIMUM_TRIPLETS, new LongMaximum(maximumTriplets)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f025c455/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java index af5a154..9711cc0 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java @@ -34,7 +34,7 @@ extends AsmTestBase { @Test public void testWithSimpleGraph() throws Exception { - Result expectedResult = new Result(6, 7, 2, 6, 13, 4, 2, 3, 1, 3, 6); + Result expectedResult = new Result(2, 6, 1, 3); Result edgeMetrics = new EdgeMetrics<IntValue, NullValue, NullValue>() .run(directedSimpleGraph) @@ -47,13 +47,11 @@ extends AsmTestBase { public void testWithCompleteGraph() throws Exception { long expectedDegree = completeGraphVertexCount - 1; - long expectedEdges = completeGraphVertexCount * expectedDegree; long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2); long expectedTriplets = completeGraphVertexCount * expectedMaximumTriplets; - Result expectedResult = new Result(completeGraphVertexCount, expectedEdges, expectedTriplets / 3, 2 * expectedTriplets / 3, expectedTriplets, - expectedDegree, expectedDegree, expectedDegree, - expectedMaximumTriplets, expectedMaximumTriplets, expectedMaximumTriplets); + Result expectedResult = new Result(expectedTriplets / 3, 2 * expectedTriplets / 3, + expectedMaximumTriplets, expectedMaximumTriplets); Result edgeMetrics = new EdgeMetrics<LongValue, NullValue, NullValue>() .run(completeGraph) @@ -67,7 +65,7 @@ extends AsmTestBase { throws Exception { Result expectedResult; - expectedResult = new Result(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0); + expectedResult = new Result(0, 0, 0, 0); Result withoutZeroDegreeVertices = new EdgeMetrics<LongValue, NullValue, NullValue>() .run(emptyGraph) @@ -79,7 +77,7 @@ extends AsmTestBase { @Test public void testWithRMatGraph() throws Exception { - Result expectedResult = new Result(902, 12009, 107817, 315537, 1003442, 463, 334, 342, 820, 3822, 106953); + Result expectedResult = new Result(107817, 315537, 820, 3822); Result withoutZeroDegreeVertices = new EdgeMetrics<LongValue, NullValue, NullValue>() .run(directedRMatGraph) http://git-wip-us.apache.org/repos/asf/flink/blob/f025c455/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java index e4362c0..c4ec8f8 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java @@ -34,7 +34,7 @@ extends AsmTestBase { @Test public void testWithSimpleGraph() throws Exception { - Result expectedResult = new Result(6, 7, 13, 4, 2, 3, 6); + Result expectedResult = new Result(6, 7, 0, 13, 4, 2, 3, 6); Result vertexMetrics = new VertexMetrics<IntValue, NullValue, NullValue>() .run(directedSimpleGraph) @@ -47,11 +47,12 @@ extends AsmTestBase { public void testWithCompleteGraph() throws Exception { long expectedDegree = completeGraphVertexCount - 1; - long expectedEdges = completeGraphVertexCount * expectedDegree; + long expectedBidirectionalEdges = completeGraphVertexCount * expectedDegree / 2; long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2); long expectedTriplets = completeGraphVertexCount * expectedMaximumTriplets; - Result expectedResult = new Result(completeGraphVertexCount, expectedEdges, expectedTriplets, expectedDegree, expectedDegree, expectedDegree, expectedMaximumTriplets); + Result expectedResult = new Result(completeGraphVertexCount, 0, expectedBidirectionalEdges, + expectedTriplets, expectedDegree, expectedDegree, expectedDegree, expectedMaximumTriplets); Result vertexMetrics = new VertexMetrics<LongValue, NullValue, NullValue>() .run(completeGraph) @@ -65,7 +66,7 @@ extends AsmTestBase { throws Exception { Result expectedResult; - expectedResult = new Result(0, 0, 0, 0, 0, 0, 0); + expectedResult = new Result(0, 0, 0, 0, 0, 0, 0, 0); Result withoutZeroDegreeVertices = new VertexMetrics<LongValue, NullValue, NullValue>() .setIncludeZeroDegreeVertices(false) @@ -74,7 +75,7 @@ extends AsmTestBase { assertEquals(withoutZeroDegreeVertices, expectedResult); - expectedResult = new Result(3, 0, 0, 0, 0, 0, 0); + expectedResult = new Result(3, 0, 0, 0, 0, 0, 0, 0); Result withZeroDegreeVertices = new VertexMetrics<LongValue, NullValue, NullValue>() .setIncludeZeroDegreeVertices(true) @@ -87,7 +88,7 @@ extends AsmTestBase { @Test public void testWithRMatGraph() throws Exception { - Result expectedResult = new Result(902, 12009, 1003442, 463, 334, 342, 106953); + Result expectedResult = new Result(902, 8875, 1567, 1003442, 463, 334, 342, 106953); Result withoutZeroDegreeVertices = new VertexMetrics<LongValue, NullValue, NullValue>() .run(directedRMatGraph) http://git-wip-us.apache.org/repos/asf/flink/blob/f025c455/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java index b300d66..cb75331 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java @@ -34,7 +34,7 @@ extends AsmTestBase { @Test public void testWithSimpleGraph() throws Exception { - Result expectedResult = new Result(6, 7, 2, 6, 13, 4, 1, 3, 6); + Result expectedResult = new Result(2, 6, 1, 3); Result edgeMetrics = new EdgeMetrics<IntValue, NullValue, NullValue>() .run(undirectedSimpleGraph) @@ -47,12 +47,11 @@ extends AsmTestBase { public void testWithCompleteGraph() throws Exception { long expectedDegree = completeGraphVertexCount - 1; - long expectedEdges = completeGraphVertexCount * expectedDegree / 2; long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2); long expectedTriplets = completeGraphVertexCount * expectedMaximumTriplets; - Result expectedResult = new Result(completeGraphVertexCount, expectedEdges, expectedTriplets / 3, 2 * expectedTriplets / 3, expectedTriplets, - expectedDegree, expectedMaximumTriplets, expectedMaximumTriplets, expectedMaximumTriplets); + Result expectedResult = new Result(expectedTriplets / 3, 2 * expectedTriplets / 3, + expectedMaximumTriplets, expectedMaximumTriplets); Result edgeMetrics = new EdgeMetrics<LongValue, NullValue, NullValue>() .run(completeGraph) @@ -66,7 +65,7 @@ extends AsmTestBase { throws Exception { Result expectedResult; - expectedResult = new Result(0, 0, 0, 0, 0, 0, 0, 0, 0); + expectedResult = new Result(0, 0, 0, 0); Result withoutZeroDegreeVertices = new EdgeMetrics<LongValue, NullValue, NullValue>() .run(emptyGraph) @@ -78,7 +77,7 @@ extends AsmTestBase { @Test public void testWithRMatGraph() throws Exception { - Result expectedResult = new Result(902, 10442, 107817, 315537, 1003442, 463, 820, 3822, 106953); + Result expectedResult = new Result(107817, 315537, 820, 3822); Result withoutZeroDegreeVertices = new EdgeMetrics<LongValue, NullValue, NullValue>() .run(undirectedRMatGraph)
