[FLINK-4257] [gelly] Handle delegating algorithm change of class Replaces Delegate with NoOpOperator.
This closes #2474 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8210ff46 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8210ff46 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8210ff46 Branch: refs/heads/master Commit: 8210ff468d64fc50520011fc6fed9909d2a6b89a Parents: 95ad865 Author: Greg Hogan <[email protected]> Authored: Mon Jul 25 09:09:27 2016 -0400 Committer: Greg Hogan <[email protected]> Committed: Thu Sep 8 17:06:29 2016 -0400 ---------------------------------------------------------------------- .../annotate/directed/EdgeDegreesPair.java | 6 +- .../annotate/directed/EdgeSourceDegrees.java | 6 +- .../annotate/directed/EdgeTargetDegrees.java | 6 +- .../degree/annotate/directed/VertexDegrees.java | 6 +- .../annotate/directed/VertexInDegree.java | 6 +- .../annotate/directed/VertexOutDegree.java | 6 +- .../annotate/undirected/EdgeDegreePair.java | 6 +- .../annotate/undirected/EdgeSourceDegree.java | 6 +- .../annotate/undirected/EdgeTargetDegree.java | 6 +- .../annotate/undirected/VertexDegree.java | 6 +- .../degree/filter/undirected/MaximumDegree.java | 6 +- .../graph/asm/simple/directed/Simplify.java | 6 +- .../graph/asm/simple/undirected/Simplify.java | 6 +- .../asm/translate/TranslateEdgeValues.java | 6 +- .../graph/asm/translate/TranslateGraphIds.java | 6 +- .../asm/translate/TranslateVertexValues.java | 6 +- .../directed/LocalClusteringCoefficient.java | 6 +- .../clustering/directed/TriangleListing.java | 6 +- .../undirected/LocalClusteringCoefficient.java | 6 +- .../clustering/undirected/TriangleListing.java | 6 +- .../flink/graph/library/link_analysis/HITS.java | 6 +- .../graph/library/similarity/AdamicAdar.java | 6 +- .../graph/library/similarity/JaccardIndex.java | 6 +- .../flink/graph/utils/proxy/Delegate.java | 112 ------------- .../proxy/GraphAlgorithmDelegatingDataSet.java | 150 ----------------- .../proxy/GraphAlgorithmDelegatingGraph.java | 160 ------------------ .../proxy/GraphAlgorithmWrappingDataSet.java | 151 +++++++++++++++++ .../proxy/GraphAlgorithmWrappingGraph.java | 161 +++++++++++++++++++ 28 files changed, 381 insertions(+), 491 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java index be19613..408516b 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java @@ -27,7 +27,7 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.util.Preconditions; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; @@ -41,7 +41,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class EdgeDegreesPair<K, VV, EV> -extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple3<EV, Degrees, Degrees>>> { +extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, Degrees, Degrees>>> { // Optional configuration private int parallelism = PARALLELISM_DEFAULT; @@ -64,7 +64,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple3<EV, Degrees, D } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! EdgeDegreesPair.class.isAssignableFrom(other.getClass())) { http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java index ee3175e..e55e3c6 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java @@ -26,7 +26,7 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.util.Preconditions; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; @@ -40,7 +40,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class EdgeSourceDegrees<K, VV, EV> -extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> { +extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> { // Optional configuration private int parallelism = PARALLELISM_DEFAULT; @@ -63,7 +63,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! EdgeSourceDegrees.class.isAssignableFrom(other.getClass())) { http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java index 6ba47f2..ed48f98 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java @@ -26,7 +26,7 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.util.Preconditions; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; @@ -40,7 +40,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class EdgeTargetDegrees<K, VV, EV> -extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> { +extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> { // Optional configuration private int parallelism = PARALLELISM_DEFAULT; @@ -63,7 +63,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! EdgeTargetDegrees.class.isAssignableFrom(other.getClass())) { http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java index 84873bc..f4d734e 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java @@ -33,7 +33,7 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; import org.apache.flink.graph.utils.Murmur3_32; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.ByteValue; import org.apache.flink.types.LongValue; @@ -50,7 +50,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class VertexDegrees<K, VV, EV> -extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, Degrees>> { +extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> { // Optional configuration private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true); @@ -90,7 +90,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, Degrees>> { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! VertexDegrees.class.isAssignableFrom(other.getClass())) { http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java index f7ac18b..3f842a6 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java @@ -25,7 +25,7 @@ import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToTargetId; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.LongValue; import org.apache.flink.util.Preconditions; @@ -40,7 +40,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class VertexInDegree<K, VV, EV> -extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> { +extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> { // Optional configuration private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true); @@ -83,7 +83,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! VertexInDegree.class.isAssignableFrom(other.getClass())) { http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java index e235f6a..0ec4fc1 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java @@ -25,7 +25,7 @@ import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToSourceId; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.LongValue; import org.apache.flink.util.Preconditions; @@ -40,7 +40,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class VertexOutDegree<K, VV, EV> -extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> { +extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> { // Optional configuration private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true); @@ -83,7 +83,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! VertexOutDegree.class.isAssignableFrom(other.getClass())) { http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java index 1f78566..09ef975 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java @@ -26,7 +26,7 @@ import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.LongValue; import org.apache.flink.util.Preconditions; @@ -42,7 +42,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class EdgeDegreePair<K, VV, EV> -extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple3<EV, LongValue, LongValue>>> { +extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, LongValue, LongValue>>> { // Optional configuration private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false); @@ -85,7 +85,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple3<EV, LongValue, } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! EdgeSourceDegree.class.isAssignableFrom(other.getClass())) { http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java index 520723c..702fead 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java @@ -25,7 +25,7 @@ import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.LongValue; import org.apache.flink.util.Preconditions; @@ -40,7 +40,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class EdgeSourceDegree<K, VV, EV> -extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>> { +extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>> { // Optional configuration private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false); @@ -83,7 +83,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue> } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! EdgeSourceDegree.class.isAssignableFrom(other.getClass())) { http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java index 123c1dc..724567e 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java @@ -25,7 +25,7 @@ import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.LongValue; import org.apache.flink.util.Preconditions; @@ -40,7 +40,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class EdgeTargetDegree<K, VV, EV> -extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>> { +extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>> { // Optional configuration private OptionalBoolean reduceOnSourceId = new OptionalBoolean(false, false); @@ -83,7 +83,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue> } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! EdgeSourceDegree.class.isAssignableFrom(other.getClass())) { http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java index 42f084d..0f753fc 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java @@ -21,7 +21,7 @@ package org.apache.flink.graph.asm.degree.annotate.undirected; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint; import org.apache.flink.api.java.DataSet; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; @@ -43,7 +43,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class VertexDegree<K, VV, EV> -extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> { +extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> { // Optional configuration private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true); @@ -103,7 +103,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! VertexDegree.class.isAssignableFrom(other.getClass())) { http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java index f9cfae9..be19ffd 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java @@ -29,7 +29,7 @@ import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph; import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.LongValue; import org.apache.flink.util.Collector; @@ -47,7 +47,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class MaximumDegree<K, VV, EV> -extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> { +extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> { // Required configuration private long maximumDegree; @@ -120,7 +120,7 @@ extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) { Preconditions.checkNotNull(other); if (! MaximumDegree.class.isAssignableFrom(other.getClass())) { http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java index 983dac9..99ffe0d 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph; import org.apache.flink.types.CopyableValue; import org.apache.flink.util.Preconditions; @@ -36,7 +36,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class Simplify<K extends Comparable<K> & CopyableValue<K>, VV, EV> -extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> { +extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> { // Optional configuration private int parallelism = PARALLELISM_DEFAULT; @@ -62,7 +62,7 @@ extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) { Preconditions.checkNotNull(other); if (! Simplify.class.isAssignableFrom(other.getClass())) { http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java index ce78cfa..45cd3f9 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph; import org.apache.flink.types.CopyableValue; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; @@ -38,7 +38,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class Simplify<K extends Comparable<K> & CopyableValue<K>, VV, EV> -extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> { +extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> { // Required configuration private boolean clipAndFlip; @@ -80,7 +80,7 @@ extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) { Preconditions.checkNotNull(other); if (! Simplify.class.isAssignableFrom(other.getClass())) { http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java index 6003c9a..bde826e 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java @@ -21,7 +21,7 @@ package org.apache.flink.graph.asm.translate; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph; import org.apache.flink.util.Preconditions; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; @@ -36,7 +36,7 @@ import static org.apache.flink.graph.asm.translate.Translate.translateEdgeValues * @param <NEW> new edge value type */ public class TranslateEdgeValues<K, VV, OLD, NEW> -extends GraphAlgorithmDelegatingGraph<K, VV, OLD, K, VV, NEW> { +extends GraphAlgorithmWrappingGraph<K, VV, OLD, K, VV, NEW> { // Required configuration private TranslateFunction<OLD,NEW> translator; @@ -76,7 +76,7 @@ extends GraphAlgorithmDelegatingGraph<K, VV, OLD, K, VV, NEW> { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) { Preconditions.checkNotNull(other); if (! TranslateEdgeValues.class.isAssignableFrom(other.getClass())) { http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java index 6ea56eb..2c67c5a 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java @@ -22,7 +22,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph; import org.apache.flink.util.Preconditions; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; @@ -38,7 +38,7 @@ import static org.apache.flink.graph.asm.translate.Translate.translateVertexIds; * @param <EV> edge value type */ public class TranslateGraphIds<OLD, NEW, VV, EV> -extends GraphAlgorithmDelegatingGraph<OLD, VV, EV, NEW, VV, EV> { +extends GraphAlgorithmWrappingGraph<OLD, VV, EV, NEW, VV, EV> { // Required configuration private TranslateFunction<OLD,NEW> translator; @@ -78,7 +78,7 @@ extends GraphAlgorithmDelegatingGraph<OLD, VV, EV, NEW, VV, EV> { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) { Preconditions.checkNotNull(other); if (! TranslateGraphIds.class.isAssignableFrom(other.getClass())) { http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java index 3a49324..9e6784e 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java @@ -21,7 +21,7 @@ package org.apache.flink.graph.asm.translate; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph; import org.apache.flink.util.Preconditions; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; @@ -36,7 +36,7 @@ import static org.apache.flink.graph.asm.translate.Translate.translateVertexValu * @param <EV> edge value type */ public class TranslateVertexValues<K, OLD, NEW, EV> -extends GraphAlgorithmDelegatingGraph<K, OLD, EV, K, NEW, EV> { +extends GraphAlgorithmWrappingGraph<K, OLD, EV, K, NEW, EV> { // Required configuration private TranslateFunction<OLD, NEW> translator; @@ -76,7 +76,7 @@ extends GraphAlgorithmDelegatingGraph<K, OLD, EV, K, NEW, EV> { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) { Preconditions.checkNotNull(other); if (! TranslateVertexValues.class.isAssignableFrom(other.getClass())) { http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java index 22c8b41..9d323a8 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java @@ -31,8 +31,8 @@ import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; import org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result; import org.apache.flink.graph.utils.Murmur3_32; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; import org.apache.flink.graph.utils.proxy.OptionalBoolean; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.types.CopyableValue; import org.apache.flink.types.LongValue; import org.apache.flink.util.Collector; @@ -57,7 +57,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class LocalClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV> -extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> { +extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { // Optional configuration private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(true, true); @@ -99,7 +99,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) { http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java index 14c731a..7df288a 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java @@ -36,7 +36,7 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; import org.apache.flink.graph.library.clustering.directed.TriangleListing.Result; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.ByteValue; import org.apache.flink.types.CopyableValue; @@ -62,7 +62,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV> -extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> { +extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { // Optional configuration private OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, false); @@ -103,7 +103,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! TriangleListing.class.isAssignableFrom(other.getClass())) { http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java index 4b4bf07..293e3f9 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java @@ -31,8 +31,8 @@ import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree; import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result; import org.apache.flink.graph.utils.Murmur3_32; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; import org.apache.flink.graph.utils.proxy.OptionalBoolean; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.types.CopyableValue; import org.apache.flink.types.LongValue; import org.apache.flink.util.Collector; @@ -57,7 +57,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class LocalClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV> -extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> { +extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { // Optional configuration private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(true, true); @@ -100,7 +100,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) { http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java index 89b86fe..c3dbf3e 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java @@ -33,7 +33,7 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeDegreePair; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.CopyableValue; import org.apache.flink.types.LongValue; @@ -63,7 +63,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV> -extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Tuple3<K, K, K>> { +extends GraphAlgorithmWrappingDataSet<K, VV, EV, Tuple3<K, K, K>> { // Optional configuration private OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, false); @@ -104,7 +104,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Tuple3<K, K, K>> { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! TriangleListing.class.isAssignableFrom(other.getClass())) { http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java index 60e99bd..7ba6fee 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java @@ -40,7 +40,7 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.library.link_analysis.HITS.Result; import org.apache.flink.graph.utils.Murmur3_32; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.types.DoubleValue; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; @@ -64,7 +64,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class HITS<K, VV, EV> -extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> { +extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { private static final String CHANGE_IN_SCORES = "change in scores"; @@ -135,7 +135,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! HITS.class.isAssignableFrom(other.getClass())) { http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java index 512a7a0..1514866 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java @@ -37,7 +37,7 @@ import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree; import org.apache.flink.graph.library.similarity.AdamicAdar.Result; import org.apache.flink.graph.utils.Murmur3_32; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.types.CopyableValue; import org.apache.flink.types.FloatValue; import org.apache.flink.types.IntValue; @@ -71,7 +71,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class AdamicAdar<K extends CopyableValue<K>, VV, EV> -extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> { +extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { private static final int GROUP_SIZE = 64; @@ -133,7 +133,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! AdamicAdar.class.isAssignableFrom(other.getClass())) { http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java index 7783e6b..1e406fa 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java @@ -31,7 +31,7 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree; import org.apache.flink.graph.library.similarity.JaccardIndex.Result; import org.apache.flink.graph.utils.Murmur3_32; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.types.CopyableValue; import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; @@ -61,7 +61,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @param <EV> edge value type */ public class JaccardIndex<K extends CopyableValue<K>, VV, EV> -extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> { +extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { public static final int DEFAULT_GROUP_SIZE = 64; @@ -159,7 +159,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! JaccardIndex.class.isAssignableFrom(other.getClass())) { http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java deleted file mode 100644 index a2d724d..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.utils.proxy; - -import javassist.util.proxy.MethodFilter; -import javassist.util.proxy.MethodHandler; -import javassist.util.proxy.ProxyFactory; -import javassist.util.proxy.ProxyObject; -import org.objenesis.ObjenesisStd; - -import java.lang.reflect.Method; - -/** - * Wraps an object with a proxy delegate whose method handler invokes all - * method calls on the wrapped object. This object can be later replaced. - * - * @param <X> the type of the proxied object - */ -public class Delegate<X> { - private X obj; - - private X proxy = null; - - /** - * Set the initial delegated object. - * - * @param obj delegated object - */ - public Delegate(X obj) { - setObject(obj); - } - - /** - * Change the delegated object. - * - * @param obj delegated object - */ - public void setObject(X obj) { - this.obj = (obj instanceof ReferentProxy) ? ((ReferentProxy<X>) obj).getProxiedObject() : obj; - } - - /** - * Instantiates and returns a proxy object which subclasses the - * delegated object. The proxy's method handler invokes all methods - * on the delegated object that is set at the time of invocation. - * - * @return delegating proxy - */ - @SuppressWarnings("unchecked") - public X getProxy() { - if (proxy != null) { - return proxy; - } - - ProxyFactory factory = new ProxyFactory(); - factory.setSuperclass(obj.getClass()); - factory.setInterfaces(new Class[]{ReferentProxy.class}); - - // create the class and instantiate an instance without calling a constructor - Class<? extends X> proxyClass = factory.createClass(new MethodFilter() { - @Override - public boolean isHandled(Method method) { - return true; - } - }); - proxy = new ObjenesisStd().newInstance(proxyClass); - - // create and set a handler to invoke all method calls on the delegated object - ((ProxyObject) proxy).setHandler(new MethodHandler() { - @Override - public Object invoke(Object self, Method thisMethod, Method proceed, Object[] args) throws Throwable { - if (thisMethod.getName().equals("getProxiedObject")) { - // this method is provided by the ReferentProxy interface - return obj; - } else { - // method visibility may be restricted - thisMethod.setAccessible(true); - return thisMethod.invoke(obj, args); - } - } - }); - - return proxy; - } - - /** - * This interface provides access via the proxy handler to the original - * object being proxied. This is necessary since we cannot and should not - * create a proxy of a proxy but must instead proxy the original object. - * - * @param <Y> the type of the proxied object - */ - protected interface ReferentProxy<Y> { - Y getProxiedObject(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java deleted file mode 100644 index 8e796e6..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.utils.proxy; - -import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * A {@link GraphAlgorithm} transforms an input {@link Graph} into an output of - * type {@code T}. A {@code GraphAlgorithmDelegatingDataSet} wraps the resultant - * {@link DataSet} with a delegating proxy object. The delegated object can be - * replaced when the same algorithm is run on the same input with a mergeable - * configuration. This allows algorithms to be composed of implicitly reusable - * algorithms without publicly sharing intermediate {@link DataSet}s. - * - * @param <K> ID type - * @param <VV> vertex value type - * @param <EV> edge value type - * @param <T> output type - */ -public abstract class GraphAlgorithmDelegatingDataSet<K, VV, EV, T> -implements GraphAlgorithm<K, VV, EV, DataSet<T>> { - - // each algorithm and input pair may map to multiple configurations - private static Map<GraphAlgorithmDelegatingDataSet, List<GraphAlgorithmDelegatingDataSet>> cache = - Collections.synchronizedMap(new HashMap<GraphAlgorithmDelegatingDataSet, List<GraphAlgorithmDelegatingDataSet>>()); - - private Graph<K,VV,EV> input; - - private Delegate<DataSet<T>> delegate; - - /** - * Algorithms are identified by name rather than by class to allow subclassing. - * - * @return name of the algorithm, which may be shared by multiple classes - * implementing the same algorithm and generating the same output - */ - protected abstract String getAlgorithmName(); - - /** - * An algorithm must first test whether the configurations can be merged - * before merging individual fields. - * - * @param other the algorithm with which to compare and merge - * @return true if and only if configuration has been merged and the - * algorithm's output can be reused - */ - protected abstract boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other); - - /** - * The implementation of the algorithm, renamed from {@link GraphAlgorithm#run(Graph)}. - * - * @param input the input graph - * @return the algorithm's output - * @throws Exception - */ - protected abstract DataSet<T> runInternal(Graph<K, VV, EV> input) throws Exception; - - @Override - public final int hashCode() { - return new HashCodeBuilder(17, 37) - .append(input) - .append(getAlgorithmName()) - .toHashCode(); - } - - @Override - public final boolean equals(Object obj) { - if (obj == null) { - return false; - } - - if (obj == this) { - return true; - } - - if (! GraphAlgorithmDelegatingDataSet.class.isAssignableFrom(obj.getClass())) { - return false; - } - - GraphAlgorithmDelegatingDataSet rhs = (GraphAlgorithmDelegatingDataSet) obj; - - return new EqualsBuilder() - .append(input, rhs.input) - .append(getAlgorithmName(), rhs.getAlgorithmName()) - .isEquals(); - } - - @Override - @SuppressWarnings("unchecked") - public final DataSet<T> run(Graph<K, VV, EV> input) - throws Exception { - this.input = input; - - if (cache.containsKey(this)) { - for (GraphAlgorithmDelegatingDataSet<K, VV, EV, T> other : cache.get(this)) { - if (mergeConfiguration(other)) { - // configuration has been merged so generate new output - DataSet<T> output = runInternal(input); - - // update delegatee object and reuse delegate - other.delegate.setObject(output); - delegate = other.delegate; - - return delegate.getProxy(); - } - } - } - - // no mergeable configuration found so generate new output - DataSet<T> output = runInternal(input); - - // create a new delegate to wrap the algorithm output - delegate = new Delegate<>(output); - - // cache this result - if (cache.containsKey(this)) { - cache.get(this).add(this); - } else { - cache.put(this, new ArrayList(Collections.singletonList(this))); - } - - return delegate.getProxy(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java deleted file mode 100644 index 705510a..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.utils.proxy; - -import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; -import org.apache.flink.graph.Vertex; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * A {@link GraphAlgorithm} transforms an input {@link Graph} into an output of - * type {@code T}. A {@code GraphAlgorithmDelegatingGraph} wraps the resultant - * {@link Graph} with a delegating proxy object. The delegated object can be - * replaced when the same algorithm is run on the same input with a mergeable - * configuration. This allows algorithms to be composed of implicitly reusable - * algorithms without publicly sharing intermediate {@link DataSet}s. - * - * @param <IN_K> input ID type - * @param <IN_VV> input vertex value type - * @param <IN_EV> input edge value type - * @param <OUT_K> output ID type - * @param <OUT_VV> output vertex value type - * @param <OUT_EV> output edge value type - */ -public abstract class GraphAlgorithmDelegatingGraph<IN_K, IN_VV, IN_EV, OUT_K, OUT_VV, OUT_EV> -implements GraphAlgorithm<IN_K, IN_VV, IN_EV, Graph<OUT_K, OUT_VV, OUT_EV>> { - - // each algorithm and input pair may map to multiple configurations - private static Map<GraphAlgorithmDelegatingGraph, List<GraphAlgorithmDelegatingGraph>> cache = - Collections.synchronizedMap(new HashMap<GraphAlgorithmDelegatingGraph, List<GraphAlgorithmDelegatingGraph>>()); - - private Graph<IN_K, IN_VV, IN_EV> input; - - private Delegate<DataSet<Vertex<OUT_K, OUT_VV>>> verticesDelegate; - - private Delegate<DataSet<Edge<OUT_K, OUT_EV>>> edgesDelegate; - - /** - * Algorithms are identified by name rather than by class to allow subclassing. - * - * @return name of the algorithm, which may be shared by multiple classes - * implementing the same algorithm and generating the same output - */ - protected abstract String getAlgorithmName(); - - /** - * An algorithm must first test whether the configurations can be merged - * before merging individual fields. - * - * @param other the algorithm with which to compare and merge - * @return true if and only if configuration has been merged and the - * algorithm's output can be reused - */ - protected abstract boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other); - - /** - * The implementation of the algorithm, renamed from {@link GraphAlgorithm#run(Graph)}. - * - * @param input the input graph - * @return the algorithm's output - * @throws Exception - */ - protected abstract Graph<OUT_K, OUT_VV, OUT_EV> runInternal(Graph<IN_K, IN_VV, IN_EV> input) throws Exception; - - @Override - public final int hashCode() { - return new HashCodeBuilder(17, 37) - .append(input) - .append(getAlgorithmName()) - .toHashCode(); - } - - @Override - public final boolean equals(Object obj) { - if (obj == null) { - return false; - } - - if (obj == this) { - return true; - } - - if (! GraphAlgorithmDelegatingGraph.class.isAssignableFrom(obj.getClass())) { - return false; - } - - GraphAlgorithmDelegatingGraph rhs = (GraphAlgorithmDelegatingGraph) obj; - - return new EqualsBuilder() - .append(input, rhs.input) - .append(getAlgorithmName(), rhs.getAlgorithmName()) - .isEquals(); - } - - @Override - @SuppressWarnings("unchecked") - public final Graph<OUT_K, OUT_VV, OUT_EV> run(Graph<IN_K, IN_VV, IN_EV> input) - throws Exception { - this.input = input; - - if (cache.containsKey(this)) { - for (GraphAlgorithmDelegatingGraph<IN_K, IN_VV, IN_EV, OUT_K, OUT_VV, OUT_EV> other : cache.get(this)) { - if (mergeConfiguration(other)) { - // configuration has been merged so generate new output - Graph<OUT_K, OUT_VV, OUT_EV> output = runInternal(input); - - // update delegatee object and reuse delegate - other.verticesDelegate.setObject(output.getVertices()); - verticesDelegate = other.verticesDelegate; - - other.edgesDelegate.setObject(output.getEdges()); - edgesDelegate = other.edgesDelegate; - - return Graph.fromDataSet(verticesDelegate.getProxy(), edgesDelegate.getProxy(), output.getContext()); - } - } - } - - // no mergeable configuration found so generate new output - Graph<OUT_K, OUT_VV, OUT_EV> output = runInternal(input); - - // create a new delegate to wrap the algorithm output - verticesDelegate = new Delegate<>(output.getVertices()); - edgesDelegate = new Delegate<>(output.getEdges()); - - // cache this result - if (cache.containsKey(this)) { - cache.get(this).add(this); - } else { - cache.put(this, new ArrayList(Collections.singletonList(this))); - } - - return Graph.fromDataSet(verticesDelegate.getProxy(), edgesDelegate.getProxy(), output.getContext()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java new file mode 100644 index 0000000..7a4a0e6 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.utils.proxy; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.NoOpOperator; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A {@link GraphAlgorithm} transforms an input {@link Graph} into an output of + * type {@code T}. A {@code GraphAlgorithmWrappingDataSet} wraps the resultant + * {@link DataSet} with a {@code NoOpOperator}. The input to the wrapped + * operator can be replaced when the same algorithm is run on the same input + * with a mergeable configuration. This allows algorithms to be composed of + * implicitly reusable algorithms without publicly sharing intermediate + * {@link DataSet}s. + * + * @param <K> ID type + * @param <VV> vertex value type + * @param <EV> edge value type + * @param <T> output type + */ +public abstract class GraphAlgorithmWrappingDataSet<K, VV, EV, T> +implements GraphAlgorithm<K, VV, EV, DataSet<T>> { + + // each algorithm and input pair may map to multiple configurations + private static Map<GraphAlgorithmWrappingDataSet, List<GraphAlgorithmWrappingDataSet>> cache = + Collections.synchronizedMap(new HashMap<GraphAlgorithmWrappingDataSet, List<GraphAlgorithmWrappingDataSet>>()); + + private Graph<K,VV,EV> input; + + private NoOpOperator<T> wrappingOperator; + + /** + * Algorithms are identified by name rather than by class to allow subclassing. + * + * @return name of the algorithm, which may be shared by multiple classes + * implementing the same algorithm and generating the same output + */ + protected abstract String getAlgorithmName(); + + /** + * An algorithm must first test whether the configurations can be merged + * before merging individual fields. + * + * @param other the algorithm with which to compare and merge + * @return true if and only if configuration has been merged and the + * algorithm's output can be reused + */ + protected abstract boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other); + + /** + * The implementation of the algorithm, renamed from {@link GraphAlgorithm#run(Graph)}. + * + * @param input the input graph + * @return the algorithm's output + * @throws Exception + */ + protected abstract DataSet<T> runInternal(Graph<K, VV, EV> input) throws Exception; + + @Override + public final int hashCode() { + return new HashCodeBuilder(17, 37) + .append(input) + .append(getAlgorithmName()) + .toHashCode(); + } + + @Override + public final boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (obj == this) { + return true; + } + + if (! GraphAlgorithmWrappingDataSet.class.isAssignableFrom(obj.getClass())) { + return false; + } + + GraphAlgorithmWrappingDataSet rhs = (GraphAlgorithmWrappingDataSet) obj; + + return new EqualsBuilder() + .append(input, rhs.input) + .append(getAlgorithmName(), rhs.getAlgorithmName()) + .isEquals(); + } + + @Override + @SuppressWarnings("unchecked") + public final DataSet<T> run(Graph<K, VV, EV> input) + throws Exception { + this.input = input; + + if (cache.containsKey(this)) { + for (GraphAlgorithmWrappingDataSet<K, VV, EV, T> other : cache.get(this)) { + if (mergeConfiguration(other)) { + // configuration has been merged so generate new output + DataSet<T> output = runInternal(input); + + other.wrappingOperator.setInput(output); + wrappingOperator = other.wrappingOperator; + + return wrappingOperator; + } + } + } + + // no mergeable configuration found so generate new output + DataSet<T> output = runInternal(input); + + // create a new operator to wrap the algorithm output + wrappingOperator = new NoOpOperator<>(output, output.getType()); + + // cache this result + if (cache.containsKey(this)) { + cache.get(this).add(this); + } else { + cache.put(this, new ArrayList(Collections.singletonList(this))); + } + + return wrappingOperator; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java new file mode 100644 index 0000000..69a6c37 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.utils.proxy; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.NoOpOperator; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A {@link GraphAlgorithm} transforms an input {@link Graph} into an output of + * type {@code T}. A {@code GraphAlgorithmWrappingDataSet} wraps the resultant + * {@link Graph} vertex and edge sets with a {@code NoOpOperator}. The input to + * the wrapped operators can be replaced when the same algorithm is run on the + * same input with a mergeable configuration. This allows algorithms to be + * composed of implicitly reusable algorithms without publicly sharing + * intermediate {@link DataSet}s. + * + * @param <IN_K> input ID type + * @param <IN_VV> input vertex value type + * @param <IN_EV> input edge value type + * @param <OUT_K> output ID type + * @param <OUT_VV> output vertex value type + * @param <OUT_EV> output edge value type + */ +public abstract class GraphAlgorithmWrappingGraph<IN_K, IN_VV, IN_EV, OUT_K, OUT_VV, OUT_EV> +implements GraphAlgorithm<IN_K, IN_VV, IN_EV, Graph<OUT_K, OUT_VV, OUT_EV>> { + + // each algorithm and input pair may map to multiple configurations + private static Map<GraphAlgorithmWrappingGraph, List<GraphAlgorithmWrappingGraph>> cache = + Collections.synchronizedMap(new HashMap<GraphAlgorithmWrappingGraph, List<GraphAlgorithmWrappingGraph>>()); + + private Graph<IN_K, IN_VV, IN_EV> input; + + private NoOpOperator<Vertex<OUT_K, OUT_VV>> verticesWrappingOperator; + + private NoOpOperator<Edge<OUT_K, OUT_EV>> edgesWrappingOperator; + + /** + * Algorithms are identified by name rather than by class to allow subclassing. + * + * @return name of the algorithm, which may be shared by multiple classes + * implementing the same algorithm and generating the same output + */ + protected abstract String getAlgorithmName(); + + /** + * An algorithm must first test whether the configurations can be merged + * before merging individual fields. + * + * @param other the algorithm with which to compare and merge + * @return true if and only if configuration has been merged and the + * algorithm's output can be reused + */ + protected abstract boolean mergeConfiguration(GraphAlgorithmWrappingGraph other); + + /** + * The implementation of the algorithm, renamed from {@link GraphAlgorithm#run(Graph)}. + * + * @param input the input graph + * @return the algorithm's output + * @throws Exception + */ + protected abstract Graph<OUT_K, OUT_VV, OUT_EV> runInternal(Graph<IN_K, IN_VV, IN_EV> input) throws Exception; + + @Override + public final int hashCode() { + return new HashCodeBuilder(17, 37) + .append(input) + .append(getAlgorithmName()) + .toHashCode(); + } + + @Override + public final boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (obj == this) { + return true; + } + + if (! GraphAlgorithmWrappingGraph.class.isAssignableFrom(obj.getClass())) { + return false; + } + + GraphAlgorithmWrappingGraph rhs = (GraphAlgorithmWrappingGraph) obj; + + return new EqualsBuilder() + .append(input, rhs.input) + .append(getAlgorithmName(), rhs.getAlgorithmName()) + .isEquals(); + } + + @Override + @SuppressWarnings("unchecked") + public final Graph<OUT_K, OUT_VV, OUT_EV> run(Graph<IN_K, IN_VV, IN_EV> input) + throws Exception { + this.input = input; + + if (cache.containsKey(this)) { + for (GraphAlgorithmWrappingGraph<IN_K, IN_VV, IN_EV, OUT_K, OUT_VV, OUT_EV> other : cache.get(this)) { + if (mergeConfiguration(other)) { + // configuration has been merged so generate new output + Graph<OUT_K, OUT_VV, OUT_EV> output = runInternal(input); + + other.verticesWrappingOperator.setInput(output.getVertices()); + other.edgesWrappingOperator.setInput(output.getEdges()); + + verticesWrappingOperator = other.verticesWrappingOperator; + edgesWrappingOperator = other.edgesWrappingOperator; + + return Graph.fromDataSet(verticesWrappingOperator, edgesWrappingOperator, output.getContext()); + } + } + } + + // no mergeable configuration found so generate new output + Graph<OUT_K, OUT_VV, OUT_EV> output = runInternal(input); + + // create a new operator to wrap the algorithm output + verticesWrappingOperator = new NoOpOperator<>(output.getVertices(), output.getVertices().getType()); + edgesWrappingOperator = new NoOpOperator<>(output.getEdges(), output.getEdges().getType()); + + // cache this result + if (cache.containsKey(this)) { + cache.get(this).add(this); + } else { + cache.put(this, new ArrayList(Collections.singletonList(this))); + } + + return Graph.fromDataSet(verticesWrappingOperator, edgesWrappingOperator, output.getContext()); + } +}
