[FLINK-4257] [gelly] Handle delegating algorithm change of class

Replaces Delegate with NoOpOperator.

This closes #2474


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8210ff46
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8210ff46
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8210ff46

Branch: refs/heads/master
Commit: 8210ff468d64fc50520011fc6fed9909d2a6b89a
Parents: 95ad865
Author: Greg Hogan <[email protected]>
Authored: Mon Jul 25 09:09:27 2016 -0400
Committer: Greg Hogan <[email protected]>
Committed: Thu Sep 8 17:06:29 2016 -0400

----------------------------------------------------------------------
 .../annotate/directed/EdgeDegreesPair.java      |   6 +-
 .../annotate/directed/EdgeSourceDegrees.java    |   6 +-
 .../annotate/directed/EdgeTargetDegrees.java    |   6 +-
 .../degree/annotate/directed/VertexDegrees.java |   6 +-
 .../annotate/directed/VertexInDegree.java       |   6 +-
 .../annotate/directed/VertexOutDegree.java      |   6 +-
 .../annotate/undirected/EdgeDegreePair.java     |   6 +-
 .../annotate/undirected/EdgeSourceDegree.java   |   6 +-
 .../annotate/undirected/EdgeTargetDegree.java   |   6 +-
 .../annotate/undirected/VertexDegree.java       |   6 +-
 .../degree/filter/undirected/MaximumDegree.java |   6 +-
 .../graph/asm/simple/directed/Simplify.java     |   6 +-
 .../graph/asm/simple/undirected/Simplify.java   |   6 +-
 .../asm/translate/TranslateEdgeValues.java      |   6 +-
 .../graph/asm/translate/TranslateGraphIds.java  |   6 +-
 .../asm/translate/TranslateVertexValues.java    |   6 +-
 .../directed/LocalClusteringCoefficient.java    |   6 +-
 .../clustering/directed/TriangleListing.java    |   6 +-
 .../undirected/LocalClusteringCoefficient.java  |   6 +-
 .../clustering/undirected/TriangleListing.java  |   6 +-
 .../flink/graph/library/link_analysis/HITS.java |   6 +-
 .../graph/library/similarity/AdamicAdar.java    |   6 +-
 .../graph/library/similarity/JaccardIndex.java  |   6 +-
 .../flink/graph/utils/proxy/Delegate.java       | 112 -------------
 .../proxy/GraphAlgorithmDelegatingDataSet.java  | 150 -----------------
 .../proxy/GraphAlgorithmDelegatingGraph.java    | 160 ------------------
 .../proxy/GraphAlgorithmWrappingDataSet.java    | 151 +++++++++++++++++
 .../proxy/GraphAlgorithmWrappingGraph.java      | 161 +++++++++++++++++++
 28 files changed, 381 insertions(+), 491 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
index be19613..408516b 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
@@ -27,7 +27,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
 import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
@@ -41,7 +41,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class EdgeDegreesPair<K, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple3<EV, Degrees, 
Degrees>>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, Degrees, 
Degrees>>> {
 
        // Optional configuration
        private int parallelism = PARALLELISM_DEFAULT;
@@ -64,7 +64,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, 
Tuple3<EV, Degrees, D
        }
 
        @Override
-       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+       protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet 
other) {
                Preconditions.checkNotNull(other);
 
                if (! EdgeDegreesPair.class.isAssignableFrom(other.getClass())) 
{

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
index ee3175e..e55e3c6 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
@@ -26,7 +26,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
 import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
@@ -40,7 +40,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class EdgeSourceDegrees<K, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, 
Degrees>>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> 
{
 
        // Optional configuration
        private int parallelism = PARALLELISM_DEFAULT;
@@ -63,7 +63,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, 
Tuple2<EV, Degrees>>>
        }
 
        @Override
-       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+       protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet 
other) {
                Preconditions.checkNotNull(other);
 
                if (! 
EdgeSourceDegrees.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
index 6ba47f2..ed48f98 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
@@ -26,7 +26,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
 import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
@@ -40,7 +40,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class EdgeTargetDegrees<K, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, 
Degrees>>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> 
{
 
        // Optional configuration
        private int parallelism = PARALLELISM_DEFAULT;
@@ -63,7 +63,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, 
Tuple2<EV, Degrees>>>
        }
 
        @Override
-       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+       protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet 
other) {
                Preconditions.checkNotNull(other);
 
                if (! 
EdgeTargetDegrees.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
index 84873bc..f4d734e 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
@@ -33,7 +33,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 import org.apache.flink.graph.utils.Murmur3_32;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.ByteValue;
 import org.apache.flink.types.LongValue;
@@ -50,7 +50,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class VertexDegrees<K, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, Degrees>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> {
 
        // Optional configuration
        private OptionalBoolean includeZeroDegreeVertices = new 
OptionalBoolean(false, true);
@@ -90,7 +90,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, 
Degrees>> {
        }
 
        @Override
-       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+       protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet 
other) {
                Preconditions.checkNotNull(other);
 
                if (! VertexDegrees.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
index f7ac18b..3f842a6 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
@@ -25,7 +25,7 @@ import org.apache.flink.graph.Vertex;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToTargetId;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Preconditions;
@@ -40,7 +40,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class VertexInDegree<K, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 
        // Optional configuration
        private OptionalBoolean includeZeroDegreeVertices = new 
OptionalBoolean(false, true);
@@ -83,7 +83,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, 
LongValue>> {
        }
 
        @Override
-       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+       protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet 
other) {
                Preconditions.checkNotNull(other);
 
                if (! VertexInDegree.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
index e235f6a..0ec4fc1 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
@@ -25,7 +25,7 @@ import org.apache.flink.graph.Vertex;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToSourceId;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Preconditions;
@@ -40,7 +40,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class VertexOutDegree<K, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 
        // Optional configuration
        private OptionalBoolean includeZeroDegreeVertices = new 
OptionalBoolean(false, true);
@@ -83,7 +83,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, 
LongValue>> {
        }
 
        @Override
-       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+       protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet 
other) {
                Preconditions.checkNotNull(other);
 
                if (! VertexOutDegree.class.isAssignableFrom(other.getClass())) 
{

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
index 1f78566..09ef975 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
@@ -26,7 +26,7 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Preconditions;
@@ -42,7 +42,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class EdgeDegreePair<K, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple3<EV, 
LongValue, LongValue>>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, LongValue, 
LongValue>>> {
 
        // Optional configuration
        private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, 
false);
@@ -85,7 +85,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, 
Tuple3<EV, LongValue,
        }
 
        @Override
-       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+       protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet 
other) {
                Preconditions.checkNotNull(other);
 
                if (! 
EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
index 520723c..702fead 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
@@ -25,7 +25,7 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Preconditions;
@@ -40,7 +40,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class EdgeSourceDegree<K, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, 
LongValue>>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, 
LongValue>>> {
 
        // Optional configuration
        private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, 
false);
@@ -83,7 +83,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, 
Tuple2<EV, LongValue>
        }
 
        @Override
-       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+       protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet 
other) {
                Preconditions.checkNotNull(other);
 
                if (! 
EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
index 123c1dc..724567e 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
@@ -25,7 +25,7 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Preconditions;
@@ -40,7 +40,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class EdgeTargetDegree<K, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, 
LongValue>>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, 
LongValue>>> {
 
        // Optional configuration
        private OptionalBoolean reduceOnSourceId = new OptionalBoolean(false, 
false);
@@ -83,7 +83,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, 
Tuple2<EV, LongValue>
        }
 
        @Override
-       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+       protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet 
other) {
                Preconditions.checkNotNull(other);
 
                if (! 
EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
index 42f084d..0f753fc 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
@@ -21,7 +21,7 @@ package org.apache.flink.graph.asm.degree.annotate.undirected;
 import org.apache.flink.api.common.functions.MapFunction;
 import 
org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
@@ -43,7 +43,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class VertexDegree<K, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 
        // Optional configuration
        private OptionalBoolean includeZeroDegreeVertices = new 
OptionalBoolean(false, true);
@@ -103,7 +103,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, 
Vertex<K, LongValue>> {
        }
 
        @Override
-       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+       protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet 
other) {
                Preconditions.checkNotNull(other);
 
                if (! VertexDegree.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
index f9cfae9..be19ffd 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
@@ -29,7 +29,7 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
@@ -47,7 +47,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class MaximumDegree<K, VV, EV>
-extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> {
+extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 
        // Required configuration
        private long maximumDegree;
@@ -120,7 +120,7 @@ extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> 
{
        }
 
        @Override
-       protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph 
other) {
+       protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) 
{
                Preconditions.checkNotNull(other);
 
                if (! MaximumDegree.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
index 983dac9..99ffe0d 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.util.Preconditions;
 
@@ -36,7 +36,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class Simplify<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> {
+extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 
        // Optional configuration
        private int parallelism = PARALLELISM_DEFAULT;
@@ -62,7 +62,7 @@ extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> {
        }
 
        @Override
-       protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph 
other) {
+       protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) 
{
                Preconditions.checkNotNull(other);
 
                if (! Simplify.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
index ce78cfa..45cd3f9 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
@@ -38,7 +38,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class Simplify<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> {
+extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 
        // Required configuration
        private boolean clipAndFlip;
@@ -80,7 +80,7 @@ extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> {
        }
 
        @Override
-       protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph 
other) {
+       protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) 
{
                Preconditions.checkNotNull(other);
 
                if (! Simplify.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
index 6003c9a..bde826e 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
@@ -21,7 +21,7 @@ package org.apache.flink.graph.asm.translate;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
@@ -36,7 +36,7 @@ import static 
org.apache.flink.graph.asm.translate.Translate.translateEdgeValues
  * @param <NEW> new edge value type
  */
 public class TranslateEdgeValues<K, VV, OLD, NEW>
-extends GraphAlgorithmDelegatingGraph<K, VV, OLD, K, VV, NEW> {
+extends GraphAlgorithmWrappingGraph<K, VV, OLD, K, VV, NEW> {
 
        // Required configuration
        private TranslateFunction<OLD,NEW> translator;
@@ -76,7 +76,7 @@ extends GraphAlgorithmDelegatingGraph<K, VV, OLD, K, VV, NEW> 
{
        }
 
        @Override
-       protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph 
other) {
+       protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) 
{
                Preconditions.checkNotNull(other);
 
                if (! 
TranslateEdgeValues.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
index 6ea56eb..2c67c5a 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
@@ -38,7 +38,7 @@ import static 
org.apache.flink.graph.asm.translate.Translate.translateVertexIds;
  * @param <EV> edge value type
  */
 public class TranslateGraphIds<OLD, NEW, VV, EV>
-extends GraphAlgorithmDelegatingGraph<OLD, VV, EV, NEW, VV, EV> {
+extends GraphAlgorithmWrappingGraph<OLD, VV, EV, NEW, VV, EV> {
 
        // Required configuration
        private TranslateFunction<OLD,NEW> translator;
@@ -78,7 +78,7 @@ extends GraphAlgorithmDelegatingGraph<OLD, VV, EV, NEW, VV, 
EV> {
        }
 
        @Override
-       protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph 
other) {
+       protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) 
{
                Preconditions.checkNotNull(other);
 
                if (! 
TranslateGraphIds.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
index 3a49324..9e6784e 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
@@ -21,7 +21,7 @@ package org.apache.flink.graph.asm.translate;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
@@ -36,7 +36,7 @@ import static 
org.apache.flink.graph.asm.translate.Translate.translateVertexValu
  * @param <EV> edge value type
  */
 public class TranslateVertexValues<K, OLD, NEW, EV>
-extends GraphAlgorithmDelegatingGraph<K, OLD, EV, K, NEW, EV> {
+extends GraphAlgorithmWrappingGraph<K, OLD, EV, K, NEW, EV> {
 
        // Required configuration
        private TranslateFunction<OLD, NEW> translator;
@@ -76,7 +76,7 @@ extends GraphAlgorithmDelegatingGraph<K, OLD, EV, K, NEW, EV> 
{
        }
 
        @Override
-       protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph 
other) {
+       protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) 
{
                Preconditions.checkNotNull(other);
 
                if (! 
TranslateVertexValues.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
index 22c8b41..9d323a8 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
@@ -31,8 +31,8 @@ import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
 import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 import 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
@@ -57,7 +57,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class LocalClusteringCoefficient<K extends Comparable<K> & 
CopyableValue<K>, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
        // Optional configuration
        private OptionalBoolean includeZeroDegreeVertices = new 
OptionalBoolean(true, true);
@@ -99,7 +99,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> 
{
        }
 
        @Override
-       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+       protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet 
other) {
                Preconditions.checkNotNull(other);
 
                if (! 
LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
index 14c731a..7df288a 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -36,7 +36,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair;
 import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 import 
org.apache.flink.graph.library.clustering.directed.TriangleListing.Result;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.ByteValue;
 import org.apache.flink.types.CopyableValue;
@@ -62,7 +62,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, 
EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
        // Optional configuration
        private OptionalBoolean sortTriangleVertices = new 
OptionalBoolean(false, false);
@@ -103,7 +103,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, 
Result<K>> {
        }
 
        @Override
-       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+       protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet 
other) {
                Preconditions.checkNotNull(other);
 
                if (! TriangleListing.class.isAssignableFrom(other.getClass())) 
{

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index 4b4bf07..293e3f9 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -31,8 +31,8 @@ import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
 import 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
@@ -57,7 +57,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class LocalClusteringCoefficient<K extends Comparable<K> & 
CopyableValue<K>, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
        // Optional configuration
        private OptionalBoolean includeZeroDegreeVertices = new 
OptionalBoolean(true, true);
@@ -100,7 +100,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, 
Result<K>> {
        }
 
        @Override
-       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+       protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet 
other) {
                Preconditions.checkNotNull(other);
 
                if (! 
LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
index 89b86fe..c3dbf3e 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
@@ -33,7 +33,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeDegreePair;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
@@ -63,7 +63,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, 
EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Tuple3<K, K, K>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Tuple3<K, K, K>> {
 
        // Optional configuration
        private OptionalBoolean sortTriangleVertices = new 
OptionalBoolean(false, false);
@@ -104,7 +104,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, 
Tuple3<K, K, K>> {
        }
 
        @Override
-       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+       protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet 
other) {
                Preconditions.checkNotNull(other);
 
                if (! TriangleListing.class.isAssignableFrom(other.getClass())) 
{

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
index 60e99bd..7ba6fee 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
@@ -40,7 +40,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.library.link_analysis.HITS.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.types.DoubleValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
@@ -64,7 +64,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class HITS<K, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
        private static final String CHANGE_IN_SCORES = "change in scores";
 
@@ -135,7 +135,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, 
Result<K>> {
        }
 
        @Override
-       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+       protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet 
other) {
                Preconditions.checkNotNull(other);
 
                if (! HITS.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
index 512a7a0..1514866 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
@@ -37,7 +37,7 @@ import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
 import org.apache.flink.graph.library.similarity.AdamicAdar.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.FloatValue;
 import org.apache.flink.types.IntValue;
@@ -71,7 +71,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class AdamicAdar<K extends CopyableValue<K>, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
        private static final int GROUP_SIZE = 64;
 
@@ -133,7 +133,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, 
Result<K>> {
        }
 
        @Override
-       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+       protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet 
other) {
                Preconditions.checkNotNull(other);
 
                if (! AdamicAdar.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
index 7783e6b..1e406fa 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
@@ -31,7 +31,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
 import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
@@ -61,7 +61,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class JaccardIndex<K extends CopyableValue<K>, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
        public static final int DEFAULT_GROUP_SIZE = 64;
 
@@ -159,7 +159,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, 
Result<K>> {
        }
 
        @Override
-       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+       protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet 
other) {
                Preconditions.checkNotNull(other);
 
                if (! JaccardIndex.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java
deleted file mode 100644
index a2d724d..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.utils.proxy;
-
-import javassist.util.proxy.MethodFilter;
-import javassist.util.proxy.MethodHandler;
-import javassist.util.proxy.ProxyFactory;
-import javassist.util.proxy.ProxyObject;
-import org.objenesis.ObjenesisStd;
-
-import java.lang.reflect.Method;
-
-/**
- * Wraps an object with a proxy delegate whose method handler invokes all
- * method calls on the wrapped object. This object can be later replaced.
- *
- * @param <X> the type of the proxied object
- */
-public class Delegate<X> {
-       private X obj;
-
-       private X proxy = null;
-
-       /**
-        * Set the initial delegated object.
-        *
-        * @param obj delegated object
-        */
-       public Delegate(X obj) {
-               setObject(obj);
-       }
-
-       /**
-        * Change the delegated object.
-        *
-        * @param obj delegated object
-        */
-       public void setObject(X obj) {
-               this.obj = (obj instanceof ReferentProxy) ? ((ReferentProxy<X>) 
obj).getProxiedObject() : obj;
-       }
-
-       /**
-        * Instantiates and returns a proxy object which subclasses the
-        * delegated object. The proxy's method handler invokes all methods
-        * on the delegated object that is set at the time of invocation.
-        *
-        * @return delegating proxy
-        */
-       @SuppressWarnings("unchecked")
-       public X getProxy() {
-               if (proxy != null) {
-                       return proxy;
-               }
-
-               ProxyFactory factory = new ProxyFactory();
-               factory.setSuperclass(obj.getClass());
-               factory.setInterfaces(new Class[]{ReferentProxy.class});
-
-               // create the class and instantiate an instance without calling 
a constructor
-               Class<? extends X> proxyClass = factory.createClass(new 
MethodFilter() {
-                       @Override
-                       public boolean isHandled(Method method) {
-                               return true;
-                       }
-               });
-               proxy = new ObjenesisStd().newInstance(proxyClass);
-
-               // create and set a handler to invoke all method calls on the 
delegated object
-               ((ProxyObject) proxy).setHandler(new MethodHandler() {
-                       @Override
-                       public Object invoke(Object self, Method thisMethod, 
Method proceed, Object[] args) throws Throwable {
-                               if 
(thisMethod.getName().equals("getProxiedObject")) {
-                                       // this method is provided by the 
ReferentProxy interface
-                                       return obj;
-                               } else {
-                                       // method visibility may be restricted
-                                       thisMethod.setAccessible(true);
-                                       return thisMethod.invoke(obj, args);
-                               }
-                       }
-               });
-
-               return proxy;
-       }
-
-       /**
-        * This interface provides access via the proxy handler to the original
-        * object being proxied. This is necessary since we cannot and should 
not
-        * create a proxy of a proxy but must instead proxy the original object.
-        *
-        * @param <Y> the type of the proxied object
-        */
-       protected interface ReferentProxy<Y> {
-               Y getProxiedObject();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java
deleted file mode 100644
index 8e796e6..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.utils.proxy;
-
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A {@link GraphAlgorithm} transforms an input {@link Graph} into an output of
- * type {@code T}. A {@code GraphAlgorithmDelegatingDataSet} wraps the 
resultant
- * {@link DataSet} with a delegating proxy object. The delegated object can be
- * replaced when the same algorithm is run on the same input with a mergeable
- * configuration. This allows algorithms to be composed of implicitly reusable
- * algorithms without publicly sharing intermediate {@link DataSet}s.
- *
- * @param <K> ID type
- * @param <VV> vertex value type
- * @param <EV> edge value type
- * @param <T> output type
- */
-public abstract class GraphAlgorithmDelegatingDataSet<K, VV, EV, T>
-implements GraphAlgorithm<K, VV, EV, DataSet<T>> {
-
-       // each algorithm and input pair may map to multiple configurations
-       private static Map<GraphAlgorithmDelegatingDataSet, 
List<GraphAlgorithmDelegatingDataSet>> cache =
-               Collections.synchronizedMap(new 
HashMap<GraphAlgorithmDelegatingDataSet, 
List<GraphAlgorithmDelegatingDataSet>>());
-
-       private Graph<K,VV,EV> input;
-
-       private Delegate<DataSet<T>> delegate;
-
-       /**
-        * Algorithms are identified by name rather than by class to allow 
subclassing.
-        *
-        * @return name of the algorithm, which may be shared by multiple 
classes
-        *               implementing the same algorithm and generating the 
same output
-        */
-       protected abstract String getAlgorithmName();
-
-       /**
-        * An algorithm must first test whether the configurations can be merged
-        * before merging individual fields.
-        *
-        * @param other the algorithm with which to compare and merge
-        * @return true if and only if configuration has been merged and the
-        *          algorithm's output can be reused
-        */
-       protected abstract boolean 
mergeConfiguration(GraphAlgorithmDelegatingDataSet other);
-
-       /**
-        * The implementation of the algorithm, renamed from {@link 
GraphAlgorithm#run(Graph)}.
-        *
-        * @param input the input graph
-        * @return the algorithm's output
-        * @throws Exception
-        */
-       protected abstract DataSet<T> runInternal(Graph<K, VV, EV> input) 
throws Exception;
-
-       @Override
-       public final int hashCode() {
-               return new HashCodeBuilder(17, 37)
-                       .append(input)
-                       .append(getAlgorithmName())
-                       .toHashCode();
-       }
-
-       @Override
-       public final boolean equals(Object obj) {
-               if (obj == null) {
-                       return false;
-               }
-
-               if (obj == this) {
-                       return true;
-               }
-
-               if (! 
GraphAlgorithmDelegatingDataSet.class.isAssignableFrom(obj.getClass())) {
-                       return false;
-               }
-
-               GraphAlgorithmDelegatingDataSet rhs = 
(GraphAlgorithmDelegatingDataSet) obj;
-
-               return new EqualsBuilder()
-                       .append(input, rhs.input)
-                       .append(getAlgorithmName(), rhs.getAlgorithmName())
-                       .isEquals();
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public final DataSet<T> run(Graph<K, VV, EV> input)
-                       throws Exception {
-               this.input = input;
-
-               if (cache.containsKey(this)) {
-                       for (GraphAlgorithmDelegatingDataSet<K, VV, EV, T> 
other : cache.get(this)) {
-                               if (mergeConfiguration(other)) {
-                                       // configuration has been merged so 
generate new output
-                                       DataSet<T> output = runInternal(input);
-
-                                       // update delegatee object and reuse 
delegate
-                                       other.delegate.setObject(output);
-                                       delegate = other.delegate;
-
-                                       return delegate.getProxy();
-                               }
-                       }
-               }
-
-               // no mergeable configuration found so generate new output
-               DataSet<T> output = runInternal(input);
-
-               // create a new delegate to wrap the algorithm output
-               delegate = new Delegate<>(output);
-
-               // cache this result
-               if (cache.containsKey(this)) {
-                       cache.get(this).add(this);
-               } else {
-                       cache.put(this, new 
ArrayList(Collections.singletonList(this)));
-               }
-
-               return delegate.getProxy();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java
deleted file mode 100644
index 705510a..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.utils.proxy;
-
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A {@link GraphAlgorithm} transforms an input {@link Graph} into an output of
- * type {@code T}. A {@code GraphAlgorithmDelegatingGraph} wraps the resultant
- * {@link Graph} with a delegating proxy object. The delegated object can be
- * replaced when the same algorithm is run on the same input with a mergeable
- * configuration. This allows algorithms to be composed of implicitly reusable
- * algorithms without publicly sharing intermediate {@link DataSet}s.
- *
- * @param <IN_K> input ID type
- * @param <IN_VV> input vertex value type
- * @param <IN_EV> input edge value type
- * @param <OUT_K> output ID type
- * @param <OUT_VV> output vertex value type
- * @param <OUT_EV> output edge value type
- */
-public abstract class GraphAlgorithmDelegatingGraph<IN_K, IN_VV, IN_EV, OUT_K, 
OUT_VV, OUT_EV>
-implements GraphAlgorithm<IN_K, IN_VV, IN_EV, Graph<OUT_K, OUT_VV, OUT_EV>> {
-
-       // each algorithm and input pair may map to multiple configurations
-       private static Map<GraphAlgorithmDelegatingGraph, 
List<GraphAlgorithmDelegatingGraph>> cache =
-               Collections.synchronizedMap(new 
HashMap<GraphAlgorithmDelegatingGraph, List<GraphAlgorithmDelegatingGraph>>());
-
-       private Graph<IN_K, IN_VV, IN_EV> input;
-
-       private Delegate<DataSet<Vertex<OUT_K, OUT_VV>>> verticesDelegate;
-
-       private Delegate<DataSet<Edge<OUT_K, OUT_EV>>> edgesDelegate;
-
-       /**
-        * Algorithms are identified by name rather than by class to allow 
subclassing.
-        *
-        * @return name of the algorithm, which may be shared by multiple 
classes
-        *               implementing the same algorithm and generating the 
same output
-        */
-       protected abstract String getAlgorithmName();
-
-       /**
-        * An algorithm must first test whether the configurations can be merged
-        * before merging individual fields.
-        *
-        * @param other the algorithm with which to compare and merge
-        * @return true if and only if configuration has been merged and the
-        *          algorithm's output can be reused
-        */
-       protected abstract boolean 
mergeConfiguration(GraphAlgorithmDelegatingGraph other);
-
-       /**
-        * The implementation of the algorithm, renamed from {@link 
GraphAlgorithm#run(Graph)}.
-        *
-        * @param input the input graph
-        * @return the algorithm's output
-        * @throws Exception
-        */
-       protected abstract Graph<OUT_K, OUT_VV, OUT_EV> runInternal(Graph<IN_K, 
IN_VV, IN_EV> input) throws Exception;
-
-       @Override
-       public final int hashCode() {
-               return new HashCodeBuilder(17, 37)
-                       .append(input)
-                       .append(getAlgorithmName())
-                       .toHashCode();
-       }
-
-       @Override
-       public final boolean equals(Object obj) {
-               if (obj == null) {
-                       return false;
-               }
-
-               if (obj == this) {
-                       return true;
-               }
-
-               if (! 
GraphAlgorithmDelegatingGraph.class.isAssignableFrom(obj.getClass())) {
-                       return false;
-               }
-
-               GraphAlgorithmDelegatingGraph rhs = 
(GraphAlgorithmDelegatingGraph) obj;
-
-               return new EqualsBuilder()
-                       .append(input, rhs.input)
-                       .append(getAlgorithmName(), rhs.getAlgorithmName())
-                       .isEquals();
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public final Graph<OUT_K, OUT_VV, OUT_EV> run(Graph<IN_K, IN_VV, IN_EV> 
input)
-                       throws Exception {
-               this.input = input;
-
-               if (cache.containsKey(this)) {
-                       for (GraphAlgorithmDelegatingGraph<IN_K, IN_VV, IN_EV, 
OUT_K, OUT_VV, OUT_EV> other : cache.get(this)) {
-                               if (mergeConfiguration(other)) {
-                                       // configuration has been merged so 
generate new output
-                                       Graph<OUT_K, OUT_VV, OUT_EV> output = 
runInternal(input);
-
-                                       // update delegatee object and reuse 
delegate
-                                       
other.verticesDelegate.setObject(output.getVertices());
-                                       verticesDelegate = 
other.verticesDelegate;
-
-                                       
other.edgesDelegate.setObject(output.getEdges());
-                                       edgesDelegate = other.edgesDelegate;
-
-                                       return 
Graph.fromDataSet(verticesDelegate.getProxy(), edgesDelegate.getProxy(), 
output.getContext());
-                               }
-                       }
-               }
-
-               // no mergeable configuration found so generate new output
-               Graph<OUT_K, OUT_VV, OUT_EV> output = runInternal(input);
-
-               // create a new delegate to wrap the algorithm output
-               verticesDelegate = new Delegate<>(output.getVertices());
-               edgesDelegate = new Delegate<>(output.getEdges());
-
-               // cache this result
-               if (cache.containsKey(this)) {
-                       cache.get(this).add(this);
-               } else {
-                       cache.put(this, new 
ArrayList(Collections.singletonList(this)));
-               }
-
-               return Graph.fromDataSet(verticesDelegate.getProxy(), 
edgesDelegate.getProxy(), output.getContext());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
new file mode 100644
index 0000000..7a4a0e6
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils.proxy;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.NoOpOperator;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link GraphAlgorithm} transforms an input {@link Graph} into an output of
+ * type {@code T}. A {@code GraphAlgorithmWrappingDataSet} wraps the resultant
+ * {@link DataSet} with a {@code NoOpOperator}. The input to the wrapped
+ * operator can be replaced when the same algorithm is run on the same input
+ * with a mergeable configuration. This allows algorithms to be composed of
+ * implicitly reusable algorithms without publicly sharing intermediate
+ * {@link DataSet}s.
+ *
+ * @param <K> ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ * @param <T> output type
+ */
+public abstract class GraphAlgorithmWrappingDataSet<K, VV, EV, T>
+implements GraphAlgorithm<K, VV, EV, DataSet<T>> {
+
+       // each algorithm and input pair may map to multiple configurations
+       private static Map<GraphAlgorithmWrappingDataSet, 
List<GraphAlgorithmWrappingDataSet>> cache =
+               Collections.synchronizedMap(new 
HashMap<GraphAlgorithmWrappingDataSet, List<GraphAlgorithmWrappingDataSet>>());
+
+       private Graph<K,VV,EV> input;
+
+       private NoOpOperator<T> wrappingOperator;
+
+       /**
+        * Algorithms are identified by name rather than by class to allow 
subclassing.
+        *
+        * @return name of the algorithm, which may be shared by multiple 
classes
+        *               implementing the same algorithm and generating the 
same output
+        */
+       protected abstract String getAlgorithmName();
+
+       /**
+        * An algorithm must first test whether the configurations can be merged
+        * before merging individual fields.
+        *
+        * @param other the algorithm with which to compare and merge
+        * @return true if and only if configuration has been merged and the
+        *          algorithm's output can be reused
+        */
+       protected abstract boolean 
mergeConfiguration(GraphAlgorithmWrappingDataSet other);
+
+       /**
+        * The implementation of the algorithm, renamed from {@link 
GraphAlgorithm#run(Graph)}.
+        *
+        * @param input the input graph
+        * @return the algorithm's output
+        * @throws Exception
+        */
+       protected abstract DataSet<T> runInternal(Graph<K, VV, EV> input) 
throws Exception;
+
+       @Override
+       public final int hashCode() {
+               return new HashCodeBuilder(17, 37)
+                       .append(input)
+                       .append(getAlgorithmName())
+                       .toHashCode();
+       }
+
+       @Override
+       public final boolean equals(Object obj) {
+               if (obj == null) {
+                       return false;
+               }
+
+               if (obj == this) {
+                       return true;
+               }
+
+               if (! 
GraphAlgorithmWrappingDataSet.class.isAssignableFrom(obj.getClass())) {
+                       return false;
+               }
+
+               GraphAlgorithmWrappingDataSet rhs = 
(GraphAlgorithmWrappingDataSet) obj;
+
+               return new EqualsBuilder()
+                       .append(input, rhs.input)
+                       .append(getAlgorithmName(), rhs.getAlgorithmName())
+                       .isEquals();
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public final DataSet<T> run(Graph<K, VV, EV> input)
+                       throws Exception {
+               this.input = input;
+
+               if (cache.containsKey(this)) {
+                       for (GraphAlgorithmWrappingDataSet<K, VV, EV, T> other 
: cache.get(this)) {
+                               if (mergeConfiguration(other)) {
+                                       // configuration has been merged so 
generate new output
+                                       DataSet<T> output = runInternal(input);
+
+                                       other.wrappingOperator.setInput(output);
+                                       wrappingOperator = 
other.wrappingOperator;
+
+                                       return wrappingOperator;
+                               }
+                       }
+               }
+
+               // no mergeable configuration found so generate new output
+               DataSet<T> output = runInternal(input);
+
+               // create a new operator to wrap the algorithm output
+               wrappingOperator = new NoOpOperator<>(output, output.getType());
+
+               // cache this result
+               if (cache.containsKey(this)) {
+                       cache.get(this).add(this);
+               } else {
+                       cache.put(this, new 
ArrayList(Collections.singletonList(this)));
+               }
+
+               return wrappingOperator;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java
new file mode 100644
index 0000000..69a6c37
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils.proxy;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.NoOpOperator;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link GraphAlgorithm} transforms an input {@link Graph} into an output of
+ * type {@code T}. A {@code GraphAlgorithmWrappingDataSet} wraps the resultant
+ * {@link Graph} vertex and edge sets with a {@code NoOpOperator}. The input to
+ * the wrapped operators can be replaced when the same algorithm is run on the
+ * same input with a mergeable configuration. This allows algorithms to be
+ * composed of implicitly reusable algorithms without publicly sharing
+ * intermediate {@link DataSet}s.
+ *
+ * @param <IN_K> input ID type
+ * @param <IN_VV> input vertex value type
+ * @param <IN_EV> input edge value type
+ * @param <OUT_K> output ID type
+ * @param <OUT_VV> output vertex value type
+ * @param <OUT_EV> output edge value type
+ */
+public abstract class GraphAlgorithmWrappingGraph<IN_K, IN_VV, IN_EV, OUT_K, 
OUT_VV, OUT_EV>
+implements GraphAlgorithm<IN_K, IN_VV, IN_EV, Graph<OUT_K, OUT_VV, OUT_EV>> {
+
+       // each algorithm and input pair may map to multiple configurations
+       private static Map<GraphAlgorithmWrappingGraph, 
List<GraphAlgorithmWrappingGraph>> cache =
+               Collections.synchronizedMap(new 
HashMap<GraphAlgorithmWrappingGraph, List<GraphAlgorithmWrappingGraph>>());
+
+       private Graph<IN_K, IN_VV, IN_EV> input;
+
+       private NoOpOperator<Vertex<OUT_K, OUT_VV>> verticesWrappingOperator;
+
+       private NoOpOperator<Edge<OUT_K, OUT_EV>> edgesWrappingOperator;
+
+       /**
+        * Algorithms are identified by name rather than by class to allow 
subclassing.
+        *
+        * @return name of the algorithm, which may be shared by multiple 
classes
+        *               implementing the same algorithm and generating the 
same output
+        */
+       protected abstract String getAlgorithmName();
+
+       /**
+        * An algorithm must first test whether the configurations can be merged
+        * before merging individual fields.
+        *
+        * @param other the algorithm with which to compare and merge
+        * @return true if and only if configuration has been merged and the
+        *          algorithm's output can be reused
+        */
+       protected abstract boolean 
mergeConfiguration(GraphAlgorithmWrappingGraph other);
+
+       /**
+        * The implementation of the algorithm, renamed from {@link 
GraphAlgorithm#run(Graph)}.
+        *
+        * @param input the input graph
+        * @return the algorithm's output
+        * @throws Exception
+        */
+       protected abstract Graph<OUT_K, OUT_VV, OUT_EV> runInternal(Graph<IN_K, 
IN_VV, IN_EV> input) throws Exception;
+
+       @Override
+       public final int hashCode() {
+               return new HashCodeBuilder(17, 37)
+                       .append(input)
+                       .append(getAlgorithmName())
+                       .toHashCode();
+       }
+
+       @Override
+       public final boolean equals(Object obj) {
+               if (obj == null) {
+                       return false;
+               }
+
+               if (obj == this) {
+                       return true;
+               }
+
+               if (! 
GraphAlgorithmWrappingGraph.class.isAssignableFrom(obj.getClass())) {
+                       return false;
+               }
+
+               GraphAlgorithmWrappingGraph rhs = (GraphAlgorithmWrappingGraph) 
obj;
+
+               return new EqualsBuilder()
+                       .append(input, rhs.input)
+                       .append(getAlgorithmName(), rhs.getAlgorithmName())
+                       .isEquals();
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public final Graph<OUT_K, OUT_VV, OUT_EV> run(Graph<IN_K, IN_VV, IN_EV> 
input)
+                       throws Exception {
+               this.input = input;
+
+               if (cache.containsKey(this)) {
+                       for (GraphAlgorithmWrappingGraph<IN_K, IN_VV, IN_EV, 
OUT_K, OUT_VV, OUT_EV> other : cache.get(this)) {
+                               if (mergeConfiguration(other)) {
+                                       // configuration has been merged so 
generate new output
+                                       Graph<OUT_K, OUT_VV, OUT_EV> output = 
runInternal(input);
+
+                                       
other.verticesWrappingOperator.setInput(output.getVertices());
+                                       
other.edgesWrappingOperator.setInput(output.getEdges());
+
+                                       verticesWrappingOperator = 
other.verticesWrappingOperator;
+                                       edgesWrappingOperator = 
other.edgesWrappingOperator;
+
+                                       return 
Graph.fromDataSet(verticesWrappingOperator, edgesWrappingOperator, 
output.getContext());
+                               }
+                       }
+               }
+
+               // no mergeable configuration found so generate new output
+               Graph<OUT_K, OUT_VV, OUT_EV> output = runInternal(input);
+
+               // create a new operator to wrap the algorithm output
+               verticesWrappingOperator = new 
NoOpOperator<>(output.getVertices(), output.getVertices().getType());
+               edgesWrappingOperator = new NoOpOperator<>(output.getEdges(), 
output.getEdges().getType());
+
+               // cache this result
+               if (cache.containsKey(this)) {
+                       cache.get(this).add(this);
+               } else {
+                       cache.put(this, new 
ArrayList(Collections.singletonList(this)));
+               }
+
+               return Graph.fromDataSet(verticesWrappingOperator, 
edgesWrappingOperator, output.getContext());
+       }
+}

Reply via email to