[FLINK-3965] [gelly] Delegating GraphAlgorithm A delegating GraphAlgorithm wraps a GraphAlgorithm result with a delegating proxy object. The delegated object can be replaced when the same algorithm is run on the same input with a mergeable configuration. This allows algorithms to be composed of implicitly reusable algorithms without publicly sharing intermediate DataSets.
This closes #2032 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/149e7a01 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/149e7a01 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/149e7a01 Branch: refs/heads/master Commit: 149e7a01445b4ba494409472dc8b0b15c7221e9e Parents: 7ab6837 Author: Greg Hogan <[email protected]> Authored: Wed May 25 06:43:41 2016 -0400 Committer: Greg Hogan <[email protected]> Committed: Fri Jul 1 14:35:42 2016 -0400 ---------------------------------------------------------------------- .../java/org/apache/flink/api/java/DataSet.java | 2 +- .../annotate/directed/EdgeDegreesPair.java | 27 ++- .../annotate/directed/EdgeSourceDegrees.java | 27 ++- .../annotate/directed/EdgeTargetDegrees.java | 27 ++- .../degree/annotate/directed/VertexDegrees.java | 43 ++++- .../annotate/directed/VertexInDegree.java | 42 ++++- .../annotate/directed/VertexOutDegree.java | 42 ++++- .../annotate/undirected/EdgeDegreePair.java | 36 +++- .../annotate/undirected/EdgeSourceDegree.java | 34 +++- .../annotate/undirected/EdgeTargetDegree.java | 34 +++- .../annotate/undirected/VertexDegree.java | 52 +++++- .../degree/filter/undirected/MaximumDegree.java | 57 ++++-- .../graph/asm/simple/directed/Simplify.java | 26 ++- .../graph/asm/simple/undirected/Simplify.java | 34 +++- .../asm/translate/TranslateEdgeValues.java | 35 +++- .../graph/asm/translate/TranslateGraphIds.java | 35 +++- .../asm/translate/TranslateVertexValues.java | 35 +++- .../directed/LocalClusteringCoefficient.java | 26 ++- .../clustering/directed/TriangleListing.java | 35 +++- .../undirected/LocalClusteringCoefficient.java | 27 ++- .../clustering/undirected/TriangleListing.java | 34 +++- .../flink/graph/library/link_analysis/HITS.java | 32 +++- .../graph/library/similarity/AdamicAdar.java | 35 +++- .../graph/library/similarity/JaccardIndex.java | 39 +++- .../flink/graph/utils/proxy/Delegate.java | 95 ++++++++++ .../proxy/GraphAlgorithmDelegatingDataSet.java | 150 +++++++++++++++ .../proxy/GraphAlgorithmDelegatingGraph.java | 160 ++++++++++++++++ .../graph/utils/proxy/OptionalBoolean.java | 135 ++++++++++++++ .../annotate/undirected/VertexDegreeTest.java | 2 +- .../graph/utils/proxy/OptionalBooleanTest.java | 181 +++++++++++++++++++ 30 files changed, 1424 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index 530de4b..e3b2ec2 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -1749,7 +1749,7 @@ public abstract class DataSet<T> { // -------------------------------------------------------------------------------------------- protected static void checkSameExecutionContext(DataSet<?> set1, DataSet<?> set2) { - if (set1.context != set2.context) { + if (set1.getExecutionEnvironment() != set2.getExecutionEnvironment()) { throw new IllegalArgumentException("The two inputs have different execution contexts."); } } http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java index 40af5ce..be19613 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java @@ -24,10 +24,11 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.util.Preconditions; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; @@ -40,7 +41,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class EdgeDegreesPair<K, VV, EV> -implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>>> { +extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple3<EV, Degrees, Degrees>>> { // Optional configuration private int parallelism = PARALLELISM_DEFAULT; @@ -58,7 +59,27 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, Degrees, Degrees } @Override - public DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> run(Graph<K, VV, EV> input) + protected String getAlgorithmName() { + return EdgeDegreesPair.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + Preconditions.checkNotNull(other); + + if (! EdgeDegreesPair.class.isAssignableFrom(other.getClass())) { + return false; + } + + EdgeDegreesPair rhs = (EdgeDegreesPair) other; + + parallelism = Math.min(parallelism, rhs.parallelism); + + return true; + } + + @Override + public DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> runInternal(Graph<K, VV, EV> input) throws Exception { // s, t, d(s) DataSet<Edge<K, Tuple2<EV, Degrees>>> edgeSourceDegrees = input http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java index e08ee56..ee3175e 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java @@ -23,10 +23,11 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.util.Preconditions; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; @@ -39,7 +40,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class EdgeSourceDegrees<K, VV, EV> -implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> { +extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> { // Optional configuration private int parallelism = PARALLELISM_DEFAULT; @@ -57,7 +58,27 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> { } @Override - public DataSet<Edge<K, Tuple2<EV, Degrees>>> run(Graph<K, VV, EV> input) + protected String getAlgorithmName() { + return EdgeSourceDegrees.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + Preconditions.checkNotNull(other); + + if (! EdgeSourceDegrees.class.isAssignableFrom(other.getClass())) { + return false; + } + + EdgeSourceDegrees rhs = (EdgeSourceDegrees) other; + + parallelism = Math.min(parallelism, rhs.parallelism); + + return true; + } + + @Override + public DataSet<Edge<K, Tuple2<EV, Degrees>>> runInternal(Graph<K, VV, EV> input) throws Exception { // s, d(s) DataSet<Vertex<K, Degrees>> vertexDegrees = input http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java index 5110513..6ba47f2 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java @@ -23,10 +23,11 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.util.Preconditions; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; @@ -39,7 +40,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class EdgeTargetDegrees<K, VV, EV> -implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> { +extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> { // Optional configuration private int parallelism = PARALLELISM_DEFAULT; @@ -57,7 +58,27 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> { } @Override - public DataSet<Edge<K, Tuple2<EV, Degrees>>> run(Graph<K, VV, EV> input) + protected String getAlgorithmName() { + return EdgeTargetDegrees.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + Preconditions.checkNotNull(other); + + if (! EdgeTargetDegrees.class.isAssignableFrom(other.getClass())) { + return false; + } + + EdgeTargetDegrees rhs = (EdgeTargetDegrees) other; + + parallelism = Math.min(parallelism, rhs.parallelism); + + return true; + } + + @Override + public DataSet<Edge<K, Tuple2<EV, Degrees>>> runInternal(Graph<K, VV, EV> input) throws Exception { // t, d(t) DataSet<Vertex<K, Degrees>> vertexDegrees = input http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java index 1f1d4ab..363ad2e 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java @@ -30,13 +30,15 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.graph.Edge; import org.apache.flink.graph.EdgeOrder; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.ByteValue; import org.apache.flink.types.LongValue; import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; @@ -48,10 +50,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class VertexDegrees<K, VV, EV> -implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Degrees>>> { +extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, Degrees>> { // Optional configuration - private boolean includeZeroDegreeVertices = false; + private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true); private int parallelism = PARALLELISM_DEFAULT; @@ -65,7 +67,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Degrees>>> { * @return this */ public VertexDegrees<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) { - this.includeZeroDegreeVertices = includeZeroDegreeVertices; + this.includeZeroDegreeVertices.set(includeZeroDegreeVertices); return this; } @@ -83,7 +85,36 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Degrees>>> { } @Override - public DataSet<Vertex<K, Degrees>> run(Graph<K, VV, EV> input) + protected String getAlgorithmName() { + return VertexOutDegree.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + Preconditions.checkNotNull(other); + + if (! VertexDegrees.class.isAssignableFrom(other.getClass())) { + return false; + } + + VertexDegrees rhs = (VertexDegrees) other; + + // verify that configurations can be merged + + if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) { + return false; + } + + // merge configurations + + includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices); + parallelism = Math.min(parallelism, rhs.parallelism); + + return true; + } + + @Override + public DataSet<Vertex<K, Degrees>> runInternal(Graph<K, VV, EV> input) throws Exception { // s, t, bitmask DataSet<Tuple3<K, K, ByteValue>> edgesWithOrder = input.getEdges() @@ -103,7 +134,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Degrees>>> { .setParallelism(parallelism) .name("Degree count"); - if (includeZeroDegreeVertices) { + if (includeZeroDegreeVertices.get()) { vertexDegrees = input.getVertices() .leftOuterJoin(vertexDegrees) .where(0) http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java index 1541abd..75f2369 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java @@ -20,11 +20,12 @@ package org.apache.flink.graph.asm.degree.annotate.directed; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToTargetId; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.LongValue; import org.apache.flink.util.Preconditions; @@ -38,10 +39,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class VertexInDegree<K, VV, EV> -implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> { +extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> { // Optional configuration - private boolean includeZeroDegreeVertices = false; + private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true); private int parallelism = PARALLELISM_DEFAULT; @@ -55,7 +56,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> { * @return this */ public VertexInDegree<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) { - this.includeZeroDegreeVertices = includeZeroDegreeVertices; + this.includeZeroDegreeVertices.set(includeZeroDegreeVertices); return this; } @@ -76,7 +77,36 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> { } @Override - public DataSet<Vertex<K, LongValue>> run(Graph<K, VV, EV> input) + protected String getAlgorithmName() { + return VertexInDegree.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + Preconditions.checkNotNull(other); + + if (! VertexInDegree.class.isAssignableFrom(other.getClass())) { + return false; + } + + VertexInDegree rhs = (VertexInDegree) other; + + // verify that configurations can be merged + + if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) { + return false; + } + + // merge configurations + + includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices); + parallelism = Math.min(parallelism, rhs.parallelism); + + return true; + } + + @Override + public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input) throws Exception { // t DataSet<Vertex<K, LongValue>> targetIds = input @@ -92,7 +122,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> { .setParallelism(parallelism) .name("Degree count"); - if (includeZeroDegreeVertices) { + if (includeZeroDegreeVertices.get()) { targetDegree = input.getVertices() .leftOuterJoin(targetDegree) .where(0) http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java index 22c0a67..b0576f8 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java @@ -20,11 +20,12 @@ package org.apache.flink.graph.asm.degree.annotate.directed; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToSourceId; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.LongValue; import org.apache.flink.util.Preconditions; @@ -38,10 +39,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class VertexOutDegree<K, VV, EV> -implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> { +extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> { // Optional configuration - private boolean includeZeroDegreeVertices = false; + private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true); private int parallelism = PARALLELISM_DEFAULT; @@ -55,7 +56,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> { * @return this */ public VertexOutDegree<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) { - this.includeZeroDegreeVertices = includeZeroDegreeVertices; + this.includeZeroDegreeVertices.set(includeZeroDegreeVertices); return this; } @@ -76,7 +77,36 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> { } @Override - public DataSet<Vertex<K, LongValue>> run(Graph<K, VV, EV> input) + protected String getAlgorithmName() { + return VertexOutDegree.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + Preconditions.checkNotNull(other); + + if (! VertexOutDegree.class.isAssignableFrom(other.getClass())) { + return false; + } + + VertexOutDegree rhs = (VertexOutDegree) other; + + // verify that configurations can be merged + + if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) { + return false; + } + + // merge configurations + + includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices); + parallelism = Math.min(parallelism, rhs.parallelism); + + return true; + } + + @Override + public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input) throws Exception { // s DataSet<Vertex<K, LongValue>> sourceIds = input @@ -92,7 +122,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> { .setParallelism(parallelism) .name("Degree count"); - if (includeZeroDegreeVertices) { + if (includeZeroDegreeVertices.get()) { sourceDegree = input.getVertices() .leftOuterJoin(sourceDegree) .where(0) http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java index f27ea54..1f78566 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java @@ -24,9 +24,10 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.LongValue; import org.apache.flink.util.Preconditions; @@ -41,10 +42,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class EdgeDegreePair<K, VV, EV> -implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>>> { +extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple3<EV, LongValue, LongValue>>> { // Optional configuration - protected boolean reduceOnTargetId = false; + private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false); private int parallelism = PARALLELISM_DEFAULT; @@ -58,7 +59,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, LongValue, LongV * @return this */ public EdgeDegreePair<K, VV, EV> setReduceOnTargetId(boolean reduceOnTargetId) { - this.reduceOnTargetId = reduceOnTargetId; + this.reduceOnTargetId.set(reduceOnTargetId); return this; } @@ -79,18 +80,39 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, LongValue, LongV } @Override - public DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> run(Graph<K, VV, EV> input) + protected String getAlgorithmName() { + return EdgeDegreePair.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + Preconditions.checkNotNull(other); + + if (! EdgeSourceDegree.class.isAssignableFrom(other.getClass())) { + return false; + } + + EdgeDegreePair rhs = (EdgeDegreePair) other; + + reduceOnTargetId.mergeWith(rhs.reduceOnTargetId); + parallelism = Math.min(parallelism, rhs.parallelism); + + return true; + } + + @Override + public DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> runInternal(Graph<K, VV, EV> input) throws Exception { // s, t, d(s) DataSet<Edge<K, Tuple2<EV, LongValue>>> edgeSourceDegrees = input .run(new EdgeSourceDegree<K, VV, EV>() - .setReduceOnTargetId(reduceOnTargetId) + .setReduceOnTargetId(reduceOnTargetId.get()) .setParallelism(parallelism)); // t, d(t) DataSet<Vertex<K, LongValue>> vertexDegrees = input .run(new VertexDegree<K, VV, EV>() - .setReduceOnTargetId(reduceOnTargetId) + .setReduceOnTargetId(reduceOnTargetId.get()) .setParallelism(parallelism)); // s, t, (d(s), d(t)) http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java index 2bba645..520723c 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java @@ -23,9 +23,10 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.LongValue; import org.apache.flink.util.Preconditions; @@ -39,10 +40,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class EdgeSourceDegree<K, VV, EV> -implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> { +extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>> { // Optional configuration - private boolean reduceOnTargetId = false; + private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false); private int parallelism = PARALLELISM_DEFAULT; @@ -56,7 +57,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> { * @return this */ public EdgeSourceDegree<K, VV, EV> setReduceOnTargetId(boolean reduceOnTargetId) { - this.reduceOnTargetId = reduceOnTargetId; + this.reduceOnTargetId.set(reduceOnTargetId); return this; } @@ -77,12 +78,33 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> { } @Override - public DataSet<Edge<K, Tuple2<EV, LongValue>>> run(Graph<K, VV, EV> input) + protected String getAlgorithmName() { + return EdgeSourceDegree.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + Preconditions.checkNotNull(other); + + if (! EdgeSourceDegree.class.isAssignableFrom(other.getClass())) { + return false; + } + + EdgeSourceDegree rhs = (EdgeSourceDegree) other; + + reduceOnTargetId.mergeWith(rhs.reduceOnTargetId); + parallelism = Math.min(parallelism, rhs.parallelism); + + return true; + } + + @Override + public DataSet<Edge<K, Tuple2<EV, LongValue>>> runInternal(Graph<K, VV, EV> input) throws Exception { // s, d(s) DataSet<Vertex<K, LongValue>> vertexDegrees = input .run(new VertexDegree<K, VV, EV>() - .setReduceOnTargetId(reduceOnTargetId) + .setReduceOnTargetId(reduceOnTargetId.get()) .setParallelism(parallelism)); // s, t, d(s) http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java index 6edaf17..123c1dc 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java @@ -23,9 +23,10 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.LongValue; import org.apache.flink.util.Preconditions; @@ -39,10 +40,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class EdgeTargetDegree<K, VV, EV> -implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> { +extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>> { // Optional configuration - private boolean reduceOnSourceId = false; + private OptionalBoolean reduceOnSourceId = new OptionalBoolean(false, false); private int parallelism = PARALLELISM_DEFAULT; @@ -56,7 +57,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> { * @return this */ public EdgeTargetDegree<K, VV, EV> setReduceOnSourceId(boolean reduceOnSourceId) { - this.reduceOnSourceId = reduceOnSourceId; + this.reduceOnSourceId.set(reduceOnSourceId); return this; } @@ -77,12 +78,33 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> { } @Override - public DataSet<Edge<K, Tuple2<EV, LongValue>>> run(Graph<K, VV, EV> input) + protected String getAlgorithmName() { + return EdgeTargetDegree.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + Preconditions.checkNotNull(other); + + if (! EdgeSourceDegree.class.isAssignableFrom(other.getClass())) { + return false; + } + + EdgeTargetDegree rhs = (EdgeTargetDegree) other; + + reduceOnSourceId.mergeWith(rhs.reduceOnSourceId); + parallelism = Math.min(parallelism, rhs.parallelism); + + return true; + } + + @Override + public DataSet<Edge<K, Tuple2<EV, LongValue>>> runInternal(Graph<K, VV, EV> input) throws Exception { // t, d(t) DataSet<Vertex<K, LongValue>> vertexDegrees = input .run(new VertexDegree<K, VV, EV>() - .setReduceOnTargetId(!reduceOnSourceId) + .setReduceOnTargetId(!reduceOnSourceId.get()) .setParallelism(parallelism)); // s, t, d(t) http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java index a2c8e03..ec72222 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java @@ -20,10 +20,11 @@ package org.apache.flink.graph.asm.degree.annotate.undirected; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToSourceId; @@ -41,12 +42,12 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class VertexDegree<K, VV, EV> -implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> { +extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> { // Optional configuration - private boolean includeZeroDegreeVertices = false; + private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true); - private boolean reduceOnTargetId = false; + private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false); private int parallelism = PARALLELISM_DEFAULT; @@ -60,7 +61,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> { * @return this */ public VertexDegree<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) { - this.includeZeroDegreeVertices = includeZeroDegreeVertices; + this.includeZeroDegreeVertices.set(includeZeroDegreeVertices); return this; } @@ -75,7 +76,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> { * @return this */ public VertexDegree<K, VV, EV> setReduceOnTargetId(boolean reduceOnTargetId) { - this.reduceOnTargetId = reduceOnTargetId; + this.reduceOnTargetId.set(reduceOnTargetId); return this; } @@ -96,9 +97,39 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> { } @Override - public DataSet<Vertex<K, LongValue>> run(Graph<K, VV, EV> input) + protected String getAlgorithmName() { + return VertexDegree.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + Preconditions.checkNotNull(other); + + if (! VertexDegree.class.isAssignableFrom(other.getClass())) { + return false; + } + + VertexDegree rhs = (VertexDegree) other; + + // verify that configurations can be merged + + if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) { + return false; + } + + // merge configurations + + includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices); + reduceOnTargetId.mergeWith(rhs.reduceOnTargetId); + parallelism = Math.min(parallelism, rhs.parallelism); + + return true; + } + + @Override + public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input) throws Exception { - MapFunction<Edge<K, EV>, Vertex<K, LongValue>> mapEdgeToId = reduceOnTargetId ? + MapFunction<Edge<K, EV>, Vertex<K, LongValue>> mapEdgeToId = reduceOnTargetId.get() ? new MapEdgeToTargetId<K, EV>() : new MapEdgeToSourceId<K, EV>(); // v @@ -115,8 +146,9 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> { .setParallelism(parallelism) .name("Degree count"); - if (includeZeroDegreeVertices) { - degree = input.getVertices() + if (includeZeroDegreeVertices.get()) { + degree = input + .getVertices() .leftOuterJoin(degree) .where(0) .equalTo(0) http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java index e7d78bb..f9cfae9 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java @@ -27,9 +27,10 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFir import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph; +import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.LongValue; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; @@ -46,15 +47,15 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class MaximumDegree<K, VV, EV> -implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> { +extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> { // Required configuration private long maximumDegree; // Optional configuration - private boolean reduceOnTargetId = false; + private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false); - private boolean broadcastHighDegreeVertices = false; + private OptionalBoolean broadcastHighDegreeVertices = new OptionalBoolean(false, false); private int parallelism = PARALLELISM_DEFAULT; @@ -79,7 +80,7 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> { * @return this */ public MaximumDegree<K, VV, EV> setReduceOnTargetId(boolean reduceOnTargetId) { - this.reduceOnTargetId = reduceOnTargetId; + this.reduceOnTargetId.set(reduceOnTargetId); return this; } @@ -96,7 +97,7 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> { * @return this */ public MaximumDegree<K, VV, EV> setBroadcastHighDegreeVertices(boolean broadcastHighDegreeVertices) { - this.broadcastHighDegreeVertices = broadcastHighDegreeVertices; + this.broadcastHighDegreeVertices.set(broadcastHighDegreeVertices); return this; } @@ -113,6 +114,36 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> { return this; } + @Override + protected String getAlgorithmName() { + return MaximumDegree.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { + Preconditions.checkNotNull(other); + + if (! MaximumDegree.class.isAssignableFrom(other.getClass())) { + return false; + } + + MaximumDegree rhs = (MaximumDegree) other; + + // verify that configurations can be merged + + if (maximumDegree != rhs.maximumDegree) { + return false; + } + + // merge configurations + + reduceOnTargetId.mergeWith(rhs.reduceOnTargetId); + broadcastHighDegreeVertices.mergeWith(rhs.broadcastHighDegreeVertices); + parallelism = Math.min(parallelism, rhs.parallelism); + + return true; + } + /* * Implementation notes: * @@ -121,12 +152,12 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> { */ @Override - public Graph<K, VV, EV> run(Graph<K, VV, EV> input) + public Graph<K, VV, EV> runInternal(Graph<K, VV, EV> input) throws Exception { // u, d(u) DataSet<Vertex<K, LongValue>> vertexDegree = input .run(new VertexDegree<K, VV, EV>() - .setReduceOnTargetId(reduceOnTargetId) + .setReduceOnTargetId(reduceOnTargetId.get()) .setParallelism(parallelism)); // u, d(u) if d(u) > maximumDegree @@ -135,7 +166,7 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> { .setParallelism(parallelism) .name("Filter high-degree vertices"); - JoinHint joinHint = broadcastHighDegreeVertices ? JoinHint.BROADCAST_HASH_SECOND : JoinHint.REPARTITION_HASH_SECOND; + JoinHint joinHint = broadcastHighDegreeVertices.get() ? JoinHint.BROADCAST_HASH_SECOND : JoinHint.REPARTITION_HASH_SECOND; // Vertices DataSet<Vertex<K, VV>> vertices = input @@ -151,17 +182,17 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> { DataSet<Edge<K, EV>> edges = input .getEdges() .leftOuterJoin(highDegreeVertices, joinHint) - .where(reduceOnTargetId ? 1 : 0) + .where(reduceOnTargetId.get() ? 1 : 0) .equalTo(0) .with(new ProjectEdge<K, EV>()) .setParallelism(parallelism) - .name("Project low-degree edges by " + (reduceOnTargetId ? "target" : "source")) + .name("Project low-degree edges by " + (reduceOnTargetId.get() ? "target" : "source")) .leftOuterJoin(highDegreeVertices, joinHint) - .where(reduceOnTargetId ? 0 : 1) + .where(reduceOnTargetId.get() ? 0 : 1) .equalTo(0) .with(new ProjectEdge<K, EV>()) .setParallelism(parallelism) - .name("Project low-degree edges by " + (reduceOnTargetId ? "source" : "target")); + .name("Project low-degree edges by " + (reduceOnTargetId.get() ? "source" : "target")); // Graph return Graph.fromDataSet(vertices, edges, input.getContext()); http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java index 7362a3e..a7fa9b6 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph; import org.apache.flink.types.CopyableValue; import org.apache.flink.util.Preconditions; @@ -38,7 +38,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class Simplify<K extends Comparable<K> & CopyableValue<K>, VV, EV> -implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> { +extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> { // Optional configuration private int parallelism = PARALLELISM_DEFAULT; @@ -59,7 +59,27 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> { } @Override - public Graph<K, VV, EV> run(Graph<K, VV, EV> input) + protected String getAlgorithmName() { + return Simplify.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { + Preconditions.checkNotNull(other); + + if (! Simplify.class.isAssignableFrom(other.getClass())) { + return false; + } + + Simplify rhs = (Simplify) other; + + parallelism = Math.min(parallelism, rhs.parallelism); + + return true; + } + + @Override + public Graph<K, VV, EV> runInternal(Graph<K, VV, EV> input) throws Exception { // Edges DataSet<Edge<K, EV>> edges = input http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java index 13ac470..d006756 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph; import org.apache.flink.types.CopyableValue; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; @@ -40,7 +40,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class Simplify<K extends Comparable<K> & CopyableValue<K>, VV, EV> -implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> { +extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> { // Required configuration private boolean clipAndFlip; @@ -77,7 +77,35 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> { } @Override - public Graph<K, VV, EV> run(Graph<K, VV, EV> input) + protected String getAlgorithmName() { + return Simplify.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { + Preconditions.checkNotNull(other); + + if (! Simplify.class.isAssignableFrom(other.getClass())) { + return false; + } + + Simplify rhs = (Simplify) other; + + // verify that configurations can be merged + + if (clipAndFlip != rhs.clipAndFlip) { + return false; + } + + // merge configurations + + parallelism = Math.min(parallelism, rhs.parallelism); + + return true; + } + + @Override + public Graph<K, VV, EV> runInternal(Graph<K, VV, EV> input) throws Exception { // Edges DataSet<Edge<K, EV>> edges = input http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java index 47ec077..6003c9a 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java @@ -21,7 +21,7 @@ package org.apache.flink.graph.asm.translate; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph; import org.apache.flink.util.Preconditions; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; @@ -36,7 +36,7 @@ import static org.apache.flink.graph.asm.translate.Translate.translateEdgeValues * @param <NEW> new edge value type */ public class TranslateEdgeValues<K, VV, OLD, NEW> -implements GraphAlgorithm<K, VV, OLD, Graph<K, VV, NEW>> { +extends GraphAlgorithmDelegatingGraph<K, VV, OLD, K, VV, NEW> { // Required configuration private TranslateFunction<OLD,NEW> translator; @@ -71,7 +71,36 @@ implements GraphAlgorithm<K, VV, OLD, Graph<K, VV, NEW>> { } @Override - public Graph<K, VV, NEW> run(Graph<K, VV, OLD> input) throws Exception { + protected String getAlgorithmName() { + return TranslateEdgeValues.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { + Preconditions.checkNotNull(other); + + if (! TranslateEdgeValues.class.isAssignableFrom(other.getClass())) { + return false; + } + + TranslateEdgeValues rhs = (TranslateEdgeValues) other; + + // verify that configurations can be merged + + if (translator != rhs.translator) { + return false; + } + + // merge configurations + + parallelism = Math.min(parallelism, rhs.parallelism); + + return true; + } + + @Override + public Graph<K, VV, NEW> runInternal(Graph<K, VV, OLD> input) + throws Exception { DataSet<Edge<K, NEW>> translatedEdges = translateEdgeValues(input.getEdges(), translator, parallelism); return Graph.fromDataSet(input.getVertices(), translatedEdges, input.getContext()); http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java index 6a06feb..6ea56eb 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java @@ -21,8 +21,8 @@ package org.apache.flink.graph.asm.translate; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph; import org.apache.flink.util.Preconditions; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; @@ -38,7 +38,7 @@ import static org.apache.flink.graph.asm.translate.Translate.translateVertexIds; * @param <EV> edge value type */ public class TranslateGraphIds<OLD, NEW, VV, EV> -implements GraphAlgorithm<OLD, VV, EV, Graph<NEW, VV, EV>> { +extends GraphAlgorithmDelegatingGraph<OLD, VV, EV, NEW, VV, EV> { // Required configuration private TranslateFunction<OLD,NEW> translator; @@ -73,7 +73,36 @@ implements GraphAlgorithm<OLD, VV, EV, Graph<NEW, VV, EV>> { } @Override - public Graph<NEW, VV, EV> run(Graph<OLD, VV, EV> input) throws Exception { + protected String getAlgorithmName() { + return TranslateGraphIds.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { + Preconditions.checkNotNull(other); + + if (! TranslateGraphIds.class.isAssignableFrom(other.getClass())) { + return false; + } + + TranslateGraphIds rhs = (TranslateGraphIds) other; + + // verify that configurations can be merged + + if (translator != rhs.translator) { + return false; + } + + // merge configurations + + parallelism = Math.min(parallelism, rhs.parallelism); + + return true; + } + + @Override + public Graph<NEW, VV, EV> runInternal(Graph<OLD, VV, EV> input) + throws Exception { // Vertices DataSet<Vertex<NEW, VV>> translatedVertices = translateVertexIds(input.getVertices(), translator, parallelism); http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java index 3d0133a..3a49324 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java @@ -20,8 +20,8 @@ package org.apache.flink.graph.asm.translate; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph; import org.apache.flink.util.Preconditions; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; @@ -36,7 +36,7 @@ import static org.apache.flink.graph.asm.translate.Translate.translateVertexValu * @param <EV> edge value type */ public class TranslateVertexValues<K, OLD, NEW, EV> -implements GraphAlgorithm<K, OLD, EV, Graph<K, NEW, EV>> { +extends GraphAlgorithmDelegatingGraph<K, OLD, EV, K, NEW, EV> { // Required configuration private TranslateFunction<OLD, NEW> translator; @@ -71,7 +71,36 @@ implements GraphAlgorithm<K, OLD, EV, Graph<K, NEW, EV>> { } @Override - public Graph<K, NEW, EV> run(Graph<K, OLD, EV> input) throws Exception { + protected String getAlgorithmName() { + return TranslateVertexValues.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { + Preconditions.checkNotNull(other); + + if (! TranslateVertexValues.class.isAssignableFrom(other.getClass())) { + return false; + } + + TranslateVertexValues rhs = (TranslateVertexValues) other; + + // verify that configurations can be merged + + if (translator != rhs.translator) { + return false; + } + + // merge configurations + + parallelism = Math.min(parallelism, rhs.parallelism); + + return true; + } + + @Override + public Graph<K, NEW, EV> runInternal(Graph<K, OLD, EV> input) + throws Exception { DataSet<Vertex<K, NEW>> translatedVertices = translateVertexValues(input.getVertices(), translator, parallelism); return Graph.fromDataSet(translatedVertices, input.getEdges(), input.getContext()); http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java index bbc167e..537ad0f 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java @@ -25,12 +25,12 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.functions.FunctionAnnotation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; import org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result; import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; import org.apache.flink.types.CopyableValue; import org.apache.flink.types.LongValue; import org.apache.flink.util.Collector; @@ -55,7 +55,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class LocalClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV> -implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> { +extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> { // Optional configuration private int littleParallelism = PARALLELISM_DEFAULT; @@ -74,6 +74,25 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> { return this; } + @Override + protected String getAlgorithmName() { + return LocalClusteringCoefficient.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + Preconditions.checkNotNull(other); + + if (! LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) { + return false; + } + + LocalClusteringCoefficient rhs = (LocalClusteringCoefficient) other; + + littleParallelism = Math.min(littleParallelism, rhs.littleParallelism); + + return true; + } /* * Implementation notes: @@ -86,12 +105,11 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> { */ @Override - public DataSet<Result<K>> run(Graph<K, VV, EV> input) + public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input) throws Exception { // u, v, w, bitmask DataSet<TriangleListing.Result<K>> triangles = input .run(new TriangleListing<K,VV,EV>() - .setSortTriangleVertices(false) .setLittleParallelism(littleParallelism)); // u, edge count http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java index 5c364f5..14c731a 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java @@ -33,9 +33,11 @@ import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.graph.Edge; import org.apache.flink.graph.EdgeOrder; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; +import org.apache.flink.graph.library.clustering.directed.TriangleListing.Result; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.ByteValue; import org.apache.flink.types.CopyableValue; import org.apache.flink.util.Collector; @@ -60,10 +62,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV> -implements GraphAlgorithm<K, VV, EV, DataSet<TriangleListing.Result<K>>> { +extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> { // Optional configuration - private boolean sortTriangleVertices = false; + private OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, false); private int littleParallelism = PARALLELISM_DEFAULT; @@ -75,7 +77,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<TriangleListing.Result<K>>> { * @return this */ public TriangleListing<K, VV, EV> setSortTriangleVertices(boolean sortTriangleVertices) { - this.sortTriangleVertices = sortTriangleVertices; + this.sortTriangleVertices.set(sortTriangleVertices); return this; } @@ -95,6 +97,27 @@ implements GraphAlgorithm<K, VV, EV, DataSet<TriangleListing.Result<K>>> { return this; } + @Override + protected String getAlgorithmName() { + return TriangleListing.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + Preconditions.checkNotNull(other); + + if (! TriangleListing.class.isAssignableFrom(other.getClass())) { + return false; + } + + TriangleListing rhs = (TriangleListing) other; + + sortTriangleVertices.mergeWith(rhs.sortTriangleVertices); + littleParallelism = Math.min(littleParallelism, rhs.littleParallelism); + + return true; + } + /* * Implementation notes: * @@ -106,7 +129,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<TriangleListing.Result<K>>> { */ @Override - public DataSet<Result<K>> run(Graph<K, VV, EV> input) + public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input) throws Exception { // u, v, bitmask where u < v DataSet<Tuple3<K, K, ByteValue>> filteredByID = input @@ -151,7 +174,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<TriangleListing.Result<K>>> { .setParallelism(littleParallelism) .name("Triangle listing"); - if (sortTriangleVertices) { + if (sortTriangleVertices.get()) { triangles = triangles .map(new SortTriangleVertices<K>()) .name("Sort triangle vertices"); http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java index 6858818..8f707fd 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java @@ -26,11 +26,11 @@ import org.apache.flink.api.java.functions.FunctionAnnotation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree; import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result; import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; import org.apache.flink.types.CopyableValue; import org.apache.flink.types.LongValue; import org.apache.flink.util.Collector; @@ -55,7 +55,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class LocalClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV> -implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> { +extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> { // Optional configuration private int littleParallelism = PARALLELISM_DEFAULT; @@ -75,6 +75,26 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> { return this; } + @Override + protected String getAlgorithmName() { + return LocalClusteringCoefficient.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + Preconditions.checkNotNull(other); + + if (! LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) { + return false; + } + + LocalClusteringCoefficient rhs = (LocalClusteringCoefficient) other; + + littleParallelism = Math.min(littleParallelism, rhs.littleParallelism); + + return true; + } + /* * Implementation notes: * @@ -86,12 +106,11 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> { */ @Override - public DataSet<Result<K>> run(Graph<K, VV, EV> input) + public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input) throws Exception { // u, v, w DataSet<Tuple3<K,K,K>> triangles = input .run(new TriangleListing<K,VV,EV>() - .setSortTriangleVertices(false) .setLittleParallelism(littleParallelism)); // u, 1 http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java index 4f8ce7a..89b86fe 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java @@ -32,8 +32,9 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeDegreePair; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.CopyableValue; import org.apache.flink.types.LongValue; import org.apache.flink.util.Collector; @@ -62,10 +63,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV> -implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> { +extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Tuple3<K, K, K>> { // Optional configuration - private boolean sortTriangleVertices = false; + private OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, false); private int littleParallelism = PARALLELISM_DEFAULT; @@ -77,7 +78,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> { * @return this */ public TriangleListing<K, VV, EV> setSortTriangleVertices(boolean sortTriangleVertices) { - this.sortTriangleVertices = sortTriangleVertices; + this.sortTriangleVertices.set(sortTriangleVertices); return this; } @@ -97,6 +98,27 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> { return this; } + @Override + protected String getAlgorithmName() { + return TriangleListing.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + Preconditions.checkNotNull(other); + + if (! TriangleListing.class.isAssignableFrom(other.getClass())) { + return false; + } + + TriangleListing rhs = (TriangleListing) other; + + sortTriangleVertices.mergeWith(rhs.sortTriangleVertices); + littleParallelism = Math.min(littleParallelism, rhs.littleParallelism); + + return true; + } + /* * Implementation notes: * @@ -108,7 +130,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> { */ @Override - public DataSet<Tuple3<K, K, K>> run(Graph<K, VV, EV> input) + public DataSet<Tuple3<K, K, K>> runInternal(Graph<K, VV, EV> input) throws Exception { // u, v where u < v DataSet<Tuple2<K, K>> filteredByID = input @@ -145,7 +167,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> { .setParallelism(littleParallelism) .name("Triangle listing"); - if (sortTriangleVertices) { + if (sortTriangleVertices.get()) { triangles = triangles .map(new SortTriangleVertices<K>()) .name("Sort triangle vertices"); http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java index 00035e4..b88badb 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java @@ -36,9 +36,10 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.library.link_analysis.HITS.Result; import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; import org.apache.flink.types.DoubleValue; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; @@ -62,7 +63,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class HITS<K, VV, EV> -implements GraphAlgorithm<K, VV, EV, DataSet<HITS.Result<K>>> { +extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> { private static final String CHANGE_IN_SCORES = "change in scores"; @@ -128,7 +129,32 @@ implements GraphAlgorithm<K, VV, EV, DataSet<HITS.Result<K>>> { } @Override - public DataSet<Result<K>> run(Graph<K, VV, EV> input) + protected String getAlgorithmName() { + return HITS.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + Preconditions.checkNotNull(other); + + if (! HITS.class.isAssignableFrom(other.getClass())) { + return false; + } + + HITS rhs = (HITS) other; + + // merge configurations + + maxIterations = Math.max(maxIterations, rhs.maxIterations); + convergenceThreshold = Math.min(convergenceThreshold, rhs.convergenceThreshold); + parallelism = Math.min(parallelism, rhs.parallelism); + + return true; + } + + + @Override + public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input) throws Exception { DataSet<Tuple2<K, K>> edges = input .getEdges() http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java index a164a5f..512a7a0 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java @@ -33,11 +33,11 @@ import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree; import org.apache.flink.graph.library.similarity.AdamicAdar.Result; import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; import org.apache.flink.types.CopyableValue; import org.apache.flink.types.FloatValue; import org.apache.flink.types.IntValue; @@ -71,7 +71,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class AdamicAdar<K extends CopyableValue<K>, VV, EV> -implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> { +extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> { private static final int GROUP_SIZE = 64; @@ -127,6 +127,35 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> { return this; } + @Override + protected String getAlgorithmName() { + return AdamicAdar.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + Preconditions.checkNotNull(other); + + if (! AdamicAdar.class.isAssignableFrom(other.getClass())) { + return false; + } + + AdamicAdar rhs = (AdamicAdar) other; + + // verify that configurations can be merged + + if (minimumRatio != rhs.minimumRatio || + minimumScore != rhs.minimumScore) { + return false; + } + + // merge configurations + + littleParallelism = Math.min(littleParallelism, rhs.littleParallelism); + + return true; + } + /* * Implementation notes: * @@ -136,7 +165,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> { */ @Override - public DataSet<Result<K>> run(Graph<K, VV, EV> input) + public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input) throws Exception { // s, d(s), 1/log(d(s)) DataSet<Tuple3<K, LongValue, FloatValue>> inverseLogDegree = input http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java index c731984..7783e6b 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java @@ -28,10 +28,10 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree; import org.apache.flink.graph.library.similarity.JaccardIndex.Result; import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; import org.apache.flink.types.CopyableValue; import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; @@ -61,7 +61,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class JaccardIndex<K extends CopyableValue<K>, VV, EV> -implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> { +extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> { public static final int DEFAULT_GROUP_SIZE = 64; @@ -153,6 +153,39 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> { return this; } + @Override + protected String getAlgorithmName() { + return JaccardIndex.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + Preconditions.checkNotNull(other); + + if (! JaccardIndex.class.isAssignableFrom(other.getClass())) { + return false; + } + + JaccardIndex rhs = (JaccardIndex) other; + + // verify that configurations can be merged + + if (unboundedScores != rhs.unboundedScores || + minimumScoreNumerator != rhs.minimumScoreNumerator || + minimumScoreDenominator != rhs.minimumScoreDenominator || + maximumScoreNumerator != rhs.maximumScoreNumerator || + maximumScoreDenominator != rhs.maximumScoreDenominator) { + return false; + } + + // merge configurations + + groupSize = Math.max(groupSize, rhs.groupSize); + littleParallelism = Math.min(littleParallelism, rhs.littleParallelism); + + return true; + } + /* * Implementation notes: * @@ -162,7 +195,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> { */ @Override - public DataSet<Result<K>> run(Graph<K, VV, EV> input) + public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input) throws Exception { // s, t, d(t) DataSet<Edge<K, Tuple2<EV, LongValue>>> neighborDegree = input http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java new file mode 100644 index 0000000..164125c --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.utils.proxy; + +import javassist.util.proxy.MethodFilter; +import javassist.util.proxy.MethodHandler; +import javassist.util.proxy.ProxyFactory; +import javassist.util.proxy.ProxyObject; +import org.objenesis.ObjenesisStd; + +import java.lang.reflect.Method; + +/** + * Wraps an object with a proxy delegate whose method handler invokes all + * method calls on the wrapped object. This object can be later replaced. + * + * @param <X> + */ +public class Delegate<X> { + private X obj; + + private X proxy = null; + + /** + * Set the initial delegated object. + * + * @param obj delegated object + */ + public Delegate(X obj) { + setObject(obj); + } + + /** + * Change the delegated object. + * + * @param obj delegated object + */ + public void setObject(X obj) { + this.obj = obj; + } + + /** + * Instantiates and returns a proxy object which subclasses the + * delegated object. The proxy's method handler invokes all methods + * on the delegated object that is set at the time of invocation. + * + * @return delegating proxy + */ + @SuppressWarnings("unchecked") + public X getProxy() { + if (proxy != null) { + return proxy; + } + + ProxyFactory factory = new ProxyFactory(); + factory.setSuperclass(obj.getClass()); + + // create the class and instantiate an instance without calling a constructor + Class<? extends X> proxyClass = factory.createClass(new MethodFilter() { + @Override + public boolean isHandled(Method method) { + return true; + } + }); + proxy = new ObjenesisStd().newInstance(proxyClass); + + // create and set a handler to invoke all method calls on the delegated object + ((ProxyObject) proxy).setHandler(new MethodHandler() { + @Override + public Object invoke(Object self, Method thisMethod, Method proceed, Object[] args) throws Throwable { + // method visibility may be restricted + thisMethod.setAccessible(true); + return thisMethod.invoke(obj, args); + } + }); + + return proxy; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java new file mode 100644 index 0000000..8e796e6 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.utils.proxy; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A {@link GraphAlgorithm} transforms an input {@link Graph} into an output of + * type {@code T}. A {@code GraphAlgorithmDelegatingDataSet} wraps the resultant + * {@link DataSet} with a delegating proxy object. The delegated object can be + * replaced when the same algorithm is run on the same input with a mergeable + * configuration. This allows algorithms to be composed of implicitly reusable + * algorithms without publicly sharing intermediate {@link DataSet}s. + * + * @param <K> ID type + * @param <VV> vertex value type + * @param <EV> edge value type + * @param <T> output type + */ +public abstract class GraphAlgorithmDelegatingDataSet<K, VV, EV, T> +implements GraphAlgorithm<K, VV, EV, DataSet<T>> { + + // each algorithm and input pair may map to multiple configurations + private static Map<GraphAlgorithmDelegatingDataSet, List<GraphAlgorithmDelegatingDataSet>> cache = + Collections.synchronizedMap(new HashMap<GraphAlgorithmDelegatingDataSet, List<GraphAlgorithmDelegatingDataSet>>()); + + private Graph<K,VV,EV> input; + + private Delegate<DataSet<T>> delegate; + + /** + * Algorithms are identified by name rather than by class to allow subclassing. + * + * @return name of the algorithm, which may be shared by multiple classes + * implementing the same algorithm and generating the same output + */ + protected abstract String getAlgorithmName(); + + /** + * An algorithm must first test whether the configurations can be merged + * before merging individual fields. + * + * @param other the algorithm with which to compare and merge + * @return true if and only if configuration has been merged and the + * algorithm's output can be reused + */ + protected abstract boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other); + + /** + * The implementation of the algorithm, renamed from {@link GraphAlgorithm#run(Graph)}. + * + * @param input the input graph + * @return the algorithm's output + * @throws Exception + */ + protected abstract DataSet<T> runInternal(Graph<K, VV, EV> input) throws Exception; + + @Override + public final int hashCode() { + return new HashCodeBuilder(17, 37) + .append(input) + .append(getAlgorithmName()) + .toHashCode(); + } + + @Override + public final boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (obj == this) { + return true; + } + + if (! GraphAlgorithmDelegatingDataSet.class.isAssignableFrom(obj.getClass())) { + return false; + } + + GraphAlgorithmDelegatingDataSet rhs = (GraphAlgorithmDelegatingDataSet) obj; + + return new EqualsBuilder() + .append(input, rhs.input) + .append(getAlgorithmName(), rhs.getAlgorithmName()) + .isEquals(); + } + + @Override + @SuppressWarnings("unchecked") + public final DataSet<T> run(Graph<K, VV, EV> input) + throws Exception { + this.input = input; + + if (cache.containsKey(this)) { + for (GraphAlgorithmDelegatingDataSet<K, VV, EV, T> other : cache.get(this)) { + if (mergeConfiguration(other)) { + // configuration has been merged so generate new output + DataSet<T> output = runInternal(input); + + // update delegatee object and reuse delegate + other.delegate.setObject(output); + delegate = other.delegate; + + return delegate.getProxy(); + } + } + } + + // no mergeable configuration found so generate new output + DataSet<T> output = runInternal(input); + + // create a new delegate to wrap the algorithm output + delegate = new Delegate<>(output); + + // cache this result + if (cache.containsKey(this)) { + cache.get(this).add(this); + } else { + cache.put(this, new ArrayList(Collections.singletonList(this))); + } + + return delegate.getProxy(); + } +}
