[FLINK-3965] [gelly] Delegating GraphAlgorithm

A delegating GraphAlgorithm wraps a GraphAlgorithm result with a
delegating proxy object. The delegated object can be replaced when the
same algorithm is run on the same input with a mergeable configuration.
This allows algorithms to be composed of implicitly reusable algorithms
without publicly sharing intermediate DataSets.

This closes #2032


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/149e7a01
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/149e7a01
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/149e7a01

Branch: refs/heads/master
Commit: 149e7a01445b4ba494409472dc8b0b15c7221e9e
Parents: 7ab6837
Author: Greg Hogan <[email protected]>
Authored: Wed May 25 06:43:41 2016 -0400
Committer: Greg Hogan <[email protected]>
Committed: Fri Jul 1 14:35:42 2016 -0400

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java |   2 +-
 .../annotate/directed/EdgeDegreesPair.java      |  27 ++-
 .../annotate/directed/EdgeSourceDegrees.java    |  27 ++-
 .../annotate/directed/EdgeTargetDegrees.java    |  27 ++-
 .../degree/annotate/directed/VertexDegrees.java |  43 ++++-
 .../annotate/directed/VertexInDegree.java       |  42 ++++-
 .../annotate/directed/VertexOutDegree.java      |  42 ++++-
 .../annotate/undirected/EdgeDegreePair.java     |  36 +++-
 .../annotate/undirected/EdgeSourceDegree.java   |  34 +++-
 .../annotate/undirected/EdgeTargetDegree.java   |  34 +++-
 .../annotate/undirected/VertexDegree.java       |  52 +++++-
 .../degree/filter/undirected/MaximumDegree.java |  57 ++++--
 .../graph/asm/simple/directed/Simplify.java     |  26 ++-
 .../graph/asm/simple/undirected/Simplify.java   |  34 +++-
 .../asm/translate/TranslateEdgeValues.java      |  35 +++-
 .../graph/asm/translate/TranslateGraphIds.java  |  35 +++-
 .../asm/translate/TranslateVertexValues.java    |  35 +++-
 .../directed/LocalClusteringCoefficient.java    |  26 ++-
 .../clustering/directed/TriangleListing.java    |  35 +++-
 .../undirected/LocalClusteringCoefficient.java  |  27 ++-
 .../clustering/undirected/TriangleListing.java  |  34 +++-
 .../flink/graph/library/link_analysis/HITS.java |  32 +++-
 .../graph/library/similarity/AdamicAdar.java    |  35 +++-
 .../graph/library/similarity/JaccardIndex.java  |  39 +++-
 .../flink/graph/utils/proxy/Delegate.java       |  95 ++++++++++
 .../proxy/GraphAlgorithmDelegatingDataSet.java  | 150 +++++++++++++++
 .../proxy/GraphAlgorithmDelegatingGraph.java    | 160 ++++++++++++++++
 .../graph/utils/proxy/OptionalBoolean.java      | 135 ++++++++++++++
 .../annotate/undirected/VertexDegreeTest.java   |   2 +-
 .../graph/utils/proxy/OptionalBooleanTest.java  | 181 +++++++++++++++++++
 30 files changed, 1424 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 530de4b..e3b2ec2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -1749,7 +1749,7 @@ public abstract class DataSet<T> {
        // 
--------------------------------------------------------------------------------------------
        
        protected static void checkSameExecutionContext(DataSet<?> set1, 
DataSet<?> set2) {
-               if (set1.context != set2.context) {
+               if (set1.getExecutionEnvironment() != 
set2.getExecutionEnvironment()) {
                        throw new IllegalArgumentException("The two inputs have 
different execution contexts.");
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
index 40af5ce..be19613 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
@@ -24,10 +24,11 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
 import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
@@ -40,7 +41,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class EdgeDegreesPair<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, Degrees, 
Degrees>>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple3<EV, Degrees, 
Degrees>>> {
 
        // Optional configuration
        private int parallelism = PARALLELISM_DEFAULT;
@@ -58,7 +59,27 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, 
Tuple3<EV, Degrees, Degrees
        }
 
        @Override
-       public DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> run(Graph<K, VV, 
EV> input)
+       protected String getAlgorithmName() {
+               return EdgeDegreesPair.class.getName();
+       }
+
+       @Override
+       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+               Preconditions.checkNotNull(other);
+
+               if (! EdgeDegreesPair.class.isAssignableFrom(other.getClass())) 
{
+                       return false;
+               }
+
+               EdgeDegreesPair rhs = (EdgeDegreesPair) other;
+
+               parallelism = Math.min(parallelism, rhs.parallelism);
+
+               return true;
+       }
+
+       @Override
+       public DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> 
runInternal(Graph<K, VV, EV> input)
                        throws Exception {
                // s, t, d(s)
                DataSet<Edge<K, Tuple2<EV, Degrees>>> edgeSourceDegrees = input

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
index e08ee56..ee3175e 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
@@ -23,10 +23,11 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
 import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
@@ -39,7 +40,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class EdgeSourceDegrees<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, 
Degrees>>> {
 
        // Optional configuration
        private int parallelism = PARALLELISM_DEFAULT;
@@ -57,7 +58,27 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, 
Tuple2<EV, Degrees>>>> {
        }
 
        @Override
-       public DataSet<Edge<K, Tuple2<EV, Degrees>>> run(Graph<K, VV, EV> input)
+       protected String getAlgorithmName() {
+               return EdgeSourceDegrees.class.getName();
+       }
+
+       @Override
+       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+               Preconditions.checkNotNull(other);
+
+               if (! 
EdgeSourceDegrees.class.isAssignableFrom(other.getClass())) {
+                       return false;
+               }
+
+               EdgeSourceDegrees rhs = (EdgeSourceDegrees) other;
+
+               parallelism = Math.min(parallelism, rhs.parallelism);
+
+               return true;
+       }
+
+       @Override
+       public DataSet<Edge<K, Tuple2<EV, Degrees>>> runInternal(Graph<K, VV, 
EV> input)
                        throws Exception {
                // s, d(s)
                DataSet<Vertex<K, Degrees>> vertexDegrees = input

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
index 5110513..6ba47f2 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
@@ -23,10 +23,11 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
 import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
@@ -39,7 +40,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class EdgeTargetDegrees<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, 
Degrees>>> {
 
        // Optional configuration
        private int parallelism = PARALLELISM_DEFAULT;
@@ -57,7 +58,27 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, 
Tuple2<EV, Degrees>>>> {
        }
 
        @Override
-       public DataSet<Edge<K, Tuple2<EV, Degrees>>> run(Graph<K, VV, EV> input)
+       protected String getAlgorithmName() {
+               return EdgeTargetDegrees.class.getName();
+       }
+
+       @Override
+       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+               Preconditions.checkNotNull(other);
+
+               if (! 
EdgeTargetDegrees.class.isAssignableFrom(other.getClass())) {
+                       return false;
+               }
+
+               EdgeTargetDegrees rhs = (EdgeTargetDegrees) other;
+
+               parallelism = Math.min(parallelism, rhs.parallelism);
+
+               return true;
+       }
+
+       @Override
+       public DataSet<Edge<K, Tuple2<EV, Degrees>>> runInternal(Graph<K, VV, 
EV> input)
                        throws Exception {
                // t, d(t)
                DataSet<Vertex<K, Degrees>> vertexDegrees = input

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
index 1f1d4ab..363ad2e 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
@@ -30,13 +30,15 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeOrder;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.ByteValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
@@ -48,10 +50,10 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class VertexDegrees<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Degrees>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, Degrees>> {
 
        // Optional configuration
-       private boolean includeZeroDegreeVertices = false;
+       private OptionalBoolean includeZeroDegreeVertices = new 
OptionalBoolean(false, true);
 
        private int parallelism = PARALLELISM_DEFAULT;
 
@@ -65,7 +67,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, 
Degrees>>> {
         * @return this
         */
        public VertexDegrees<K, VV, EV> setIncludeZeroDegreeVertices(boolean 
includeZeroDegreeVertices) {
-               this.includeZeroDegreeVertices = includeZeroDegreeVertices;
+               this.includeZeroDegreeVertices.set(includeZeroDegreeVertices);
 
                return this;
        }
@@ -83,7 +85,36 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, 
Degrees>>> {
        }
 
        @Override
-       public DataSet<Vertex<K, Degrees>> run(Graph<K, VV, EV> input)
+       protected String getAlgorithmName() {
+               return VertexOutDegree.class.getName();
+       }
+
+       @Override
+       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+               Preconditions.checkNotNull(other);
+
+               if (! VertexDegrees.class.isAssignableFrom(other.getClass())) {
+                       return false;
+               }
+
+               VertexDegrees rhs = (VertexDegrees) other;
+
+               // verify that configurations can be merged
+
+               if 
(includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
+                       return false;
+               }
+
+               // merge configurations
+
+               
includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
+               parallelism = Math.min(parallelism, rhs.parallelism);
+
+               return true;
+       }
+
+       @Override
+       public DataSet<Vertex<K, Degrees>> runInternal(Graph<K, VV, EV> input)
                        throws Exception {
                // s, t, bitmask
                DataSet<Tuple3<K, K, ByteValue>> edgesWithOrder = 
input.getEdges()
@@ -103,7 +134,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, 
Degrees>>> {
                                .setParallelism(parallelism)
                                .name("Degree count");
 
-               if (includeZeroDegreeVertices) {
+               if (includeZeroDegreeVertices.get()) {
                        vertexDegrees = input.getVertices()
                                .leftOuterJoin(vertexDegrees)
                                .where(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
index 1541abd..75f2369 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
@@ -20,11 +20,12 @@ package org.apache.flink.graph.asm.degree.annotate.directed;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToTargetId;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Preconditions;
 
@@ -38,10 +39,10 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class VertexInDegree<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 
        // Optional configuration
-       private boolean includeZeroDegreeVertices = false;
+       private OptionalBoolean includeZeroDegreeVertices = new 
OptionalBoolean(false, true);
 
        private int parallelism = PARALLELISM_DEFAULT;
 
@@ -55,7 +56,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, 
LongValue>>> {
         * @return this
         */
        public VertexInDegree<K, VV, EV> setIncludeZeroDegreeVertices(boolean 
includeZeroDegreeVertices) {
-               this.includeZeroDegreeVertices = includeZeroDegreeVertices;
+               this.includeZeroDegreeVertices.set(includeZeroDegreeVertices);
 
                return this;
        }
@@ -76,7 +77,36 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, 
LongValue>>> {
        }
 
        @Override
-       public DataSet<Vertex<K, LongValue>> run(Graph<K, VV, EV> input)
+       protected String getAlgorithmName() {
+               return VertexInDegree.class.getName();
+       }
+
+       @Override
+       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+               Preconditions.checkNotNull(other);
+
+               if (! VertexInDegree.class.isAssignableFrom(other.getClass())) {
+                       return false;
+               }
+
+               VertexInDegree rhs = (VertexInDegree) other;
+
+               // verify that configurations can be merged
+
+               if 
(includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
+                       return false;
+               }
+
+               // merge configurations
+
+               
includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
+               parallelism = Math.min(parallelism, rhs.parallelism);
+
+               return true;
+       }
+
+       @Override
+       public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
                        throws Exception {
                // t
                DataSet<Vertex<K, LongValue>> targetIds = input
@@ -92,7 +122,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, 
LongValue>>> {
                                .setParallelism(parallelism)
                                .name("Degree count");
 
-               if (includeZeroDegreeVertices) {
+               if (includeZeroDegreeVertices.get()) {
                        targetDegree = input.getVertices()
                                .leftOuterJoin(targetDegree)
                                .where(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
index 22c0a67..b0576f8 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
@@ -20,11 +20,12 @@ package org.apache.flink.graph.asm.degree.annotate.directed;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToSourceId;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Preconditions;
 
@@ -38,10 +39,10 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class VertexOutDegree<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 
        // Optional configuration
-       private boolean includeZeroDegreeVertices = false;
+       private OptionalBoolean includeZeroDegreeVertices = new 
OptionalBoolean(false, true);
 
        private int parallelism = PARALLELISM_DEFAULT;
 
@@ -55,7 +56,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, 
LongValue>>> {
         * @return this
         */
        public VertexOutDegree<K, VV, EV> setIncludeZeroDegreeVertices(boolean 
includeZeroDegreeVertices) {
-               this.includeZeroDegreeVertices = includeZeroDegreeVertices;
+               this.includeZeroDegreeVertices.set(includeZeroDegreeVertices);
 
                return this;
        }
@@ -76,7 +77,36 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, 
LongValue>>> {
        }
 
        @Override
-       public DataSet<Vertex<K, LongValue>> run(Graph<K, VV, EV> input)
+       protected String getAlgorithmName() {
+               return VertexOutDegree.class.getName();
+       }
+
+       @Override
+       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+               Preconditions.checkNotNull(other);
+
+               if (! VertexOutDegree.class.isAssignableFrom(other.getClass())) 
{
+                       return false;
+               }
+
+               VertexOutDegree rhs = (VertexOutDegree) other;
+
+               // verify that configurations can be merged
+
+               if 
(includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
+                       return false;
+               }
+
+               // merge configurations
+
+               
includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
+               parallelism = Math.min(parallelism, rhs.parallelism);
+
+               return true;
+       }
+
+       @Override
+       public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
                        throws Exception {
                // s
                DataSet<Vertex<K, LongValue>> sourceIds = input
@@ -92,7 +122,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, 
LongValue>>> {
                                .setParallelism(parallelism)
                                .name("Degree count");
 
-               if (includeZeroDegreeVertices) {
+               if (includeZeroDegreeVertices.get()) {
                        sourceDegree = input.getVertices()
                                .leftOuterJoin(sourceDegree)
                                .where(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
index f27ea54..1f78566 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
@@ -24,9 +24,10 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Preconditions;
 
@@ -41,10 +42,10 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class EdgeDegreePair<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, LongValue, 
LongValue>>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple3<EV, 
LongValue, LongValue>>> {
 
        // Optional configuration
-       protected boolean reduceOnTargetId = false;
+       private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, 
false);
 
        private int parallelism = PARALLELISM_DEFAULT;
 
@@ -58,7 +59,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, 
Tuple3<EV, LongValue, LongV
         * @return this
         */
        public EdgeDegreePair<K, VV, EV> setReduceOnTargetId(boolean 
reduceOnTargetId) {
-               this.reduceOnTargetId = reduceOnTargetId;
+               this.reduceOnTargetId.set(reduceOnTargetId);
 
                return this;
        }
@@ -79,18 +80,39 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, 
Tuple3<EV, LongValue, LongV
        }
 
        @Override
-       public DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> run(Graph<K, 
VV, EV> input)
+       protected String getAlgorithmName() {
+               return EdgeDegreePair.class.getName();
+       }
+
+       @Override
+       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+               Preconditions.checkNotNull(other);
+
+               if (! 
EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
+                       return false;
+               }
+
+               EdgeDegreePair rhs = (EdgeDegreePair) other;
+
+               reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
+               parallelism = Math.min(parallelism, rhs.parallelism);
+
+               return true;
+       }
+
+       @Override
+       public DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> 
runInternal(Graph<K, VV, EV> input)
                        throws Exception {
                // s, t, d(s)
                DataSet<Edge<K, Tuple2<EV, LongValue>>> edgeSourceDegrees = 
input
                        .run(new EdgeSourceDegree<K, VV, EV>()
-                               .setReduceOnTargetId(reduceOnTargetId)
+                               .setReduceOnTargetId(reduceOnTargetId.get())
                                .setParallelism(parallelism));
 
                // t, d(t)
                DataSet<Vertex<K, LongValue>> vertexDegrees = input
                        .run(new VertexDegree<K, VV, EV>()
-                               .setReduceOnTargetId(reduceOnTargetId)
+                               .setReduceOnTargetId(reduceOnTargetId.get())
                                .setParallelism(parallelism));
 
                // s, t, (d(s), d(t))

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
index 2bba645..520723c 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
@@ -23,9 +23,10 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Preconditions;
 
@@ -39,10 +40,10 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class EdgeSourceDegree<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, 
LongValue>>> {
 
        // Optional configuration
-       private boolean reduceOnTargetId = false;
+       private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, 
false);
 
        private int parallelism = PARALLELISM_DEFAULT;
 
@@ -56,7 +57,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, 
Tuple2<EV, LongValue>>>> {
         * @return this
         */
        public EdgeSourceDegree<K, VV, EV> setReduceOnTargetId(boolean 
reduceOnTargetId) {
-               this.reduceOnTargetId = reduceOnTargetId;
+               this.reduceOnTargetId.set(reduceOnTargetId);
 
                return this;
        }
@@ -77,12 +78,33 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, 
Tuple2<EV, LongValue>>>> {
        }
 
        @Override
-       public DataSet<Edge<K, Tuple2<EV, LongValue>>> run(Graph<K, VV, EV> 
input)
+       protected String getAlgorithmName() {
+               return EdgeSourceDegree.class.getName();
+       }
+
+       @Override
+       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+               Preconditions.checkNotNull(other);
+
+               if (! 
EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
+                       return false;
+               }
+
+               EdgeSourceDegree rhs = (EdgeSourceDegree) other;
+
+               reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
+               parallelism = Math.min(parallelism, rhs.parallelism);
+
+               return true;
+       }
+
+       @Override
+       public DataSet<Edge<K, Tuple2<EV, LongValue>>> runInternal(Graph<K, VV, 
EV> input)
                        throws Exception {
                // s, d(s)
                DataSet<Vertex<K, LongValue>> vertexDegrees = input
                        .run(new VertexDegree<K, VV, EV>()
-                               .setReduceOnTargetId(reduceOnTargetId)
+                               .setReduceOnTargetId(reduceOnTargetId.get())
                                .setParallelism(parallelism));
 
                // s, t, d(s)

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
index 6edaf17..123c1dc 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
@@ -23,9 +23,10 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Preconditions;
 
@@ -39,10 +40,10 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class EdgeTargetDegree<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, 
LongValue>>> {
 
        // Optional configuration
-       private boolean reduceOnSourceId = false;
+       private OptionalBoolean reduceOnSourceId = new OptionalBoolean(false, 
false);
 
        private int parallelism = PARALLELISM_DEFAULT;
 
@@ -56,7 +57,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, 
Tuple2<EV, LongValue>>>> {
         * @return this
         */
        public EdgeTargetDegree<K, VV, EV> setReduceOnSourceId(boolean 
reduceOnSourceId) {
-               this.reduceOnSourceId = reduceOnSourceId;
+               this.reduceOnSourceId.set(reduceOnSourceId);
 
                return this;
        }
@@ -77,12 +78,33 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, 
Tuple2<EV, LongValue>>>> {
        }
 
        @Override
-       public DataSet<Edge<K, Tuple2<EV, LongValue>>> run(Graph<K, VV, EV> 
input)
+       protected String getAlgorithmName() {
+               return EdgeTargetDegree.class.getName();
+       }
+
+       @Override
+       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+               Preconditions.checkNotNull(other);
+
+               if (! 
EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
+                       return false;
+               }
+
+               EdgeTargetDegree rhs = (EdgeTargetDegree) other;
+
+               reduceOnSourceId.mergeWith(rhs.reduceOnSourceId);
+               parallelism = Math.min(parallelism, rhs.parallelism);
+
+               return true;
+       }
+
+       @Override
+       public DataSet<Edge<K, Tuple2<EV, LongValue>>> runInternal(Graph<K, VV, 
EV> input)
                        throws Exception {
                // t, d(t)
                DataSet<Vertex<K, LongValue>> vertexDegrees = input
                        .run(new VertexDegree<K, VV, EV>()
-                               .setReduceOnTargetId(!reduceOnSourceId)
+                               .setReduceOnTargetId(!reduceOnSourceId.get())
                                .setParallelism(parallelism));
 
                // s, t, d(t)

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
index a2c8e03..ec72222 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
@@ -20,10 +20,11 @@ package 
org.apache.flink.graph.asm.degree.annotate.undirected;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
 import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToSourceId;
@@ -41,12 +42,12 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class VertexDegree<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 
        // Optional configuration
-       private boolean includeZeroDegreeVertices = false;
+       private OptionalBoolean includeZeroDegreeVertices = new 
OptionalBoolean(false, true);
 
-       private boolean reduceOnTargetId = false;
+       private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, 
false);
 
        private int parallelism = PARALLELISM_DEFAULT;
 
@@ -60,7 +61,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, 
LongValue>>> {
         * @return this
         */
        public VertexDegree<K, VV, EV> setIncludeZeroDegreeVertices(boolean 
includeZeroDegreeVertices) {
-               this.includeZeroDegreeVertices = includeZeroDegreeVertices;
+               this.includeZeroDegreeVertices.set(includeZeroDegreeVertices);
 
                return this;
        }
@@ -75,7 +76,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, 
LongValue>>> {
         * @return this
         */
        public VertexDegree<K, VV, EV> setReduceOnTargetId(boolean 
reduceOnTargetId) {
-               this.reduceOnTargetId = reduceOnTargetId;
+               this.reduceOnTargetId.set(reduceOnTargetId);
 
                return this;
        }
@@ -96,9 +97,39 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, 
LongValue>>> {
        }
 
        @Override
-       public DataSet<Vertex<K, LongValue>> run(Graph<K, VV, EV> input)
+       protected String getAlgorithmName() {
+               return VertexDegree.class.getName();
+       }
+
+       @Override
+       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+               Preconditions.checkNotNull(other);
+
+               if (! VertexDegree.class.isAssignableFrom(other.getClass())) {
+                       return false;
+               }
+
+               VertexDegree rhs = (VertexDegree) other;
+
+               // verify that configurations can be merged
+
+               if 
(includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
+                       return false;
+               }
+
+               // merge configurations
+
+               
includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
+               reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
+               parallelism = Math.min(parallelism, rhs.parallelism);
+
+               return true;
+       }
+
+       @Override
+       public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
                        throws Exception {
-               MapFunction<Edge<K, EV>, Vertex<K, LongValue>> mapEdgeToId = 
reduceOnTargetId ?
+               MapFunction<Edge<K, EV>, Vertex<K, LongValue>> mapEdgeToId = 
reduceOnTargetId.get() ?
                        new MapEdgeToTargetId<K, EV>() : new 
MapEdgeToSourceId<K, EV>();
 
                // v
@@ -115,8 +146,9 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, 
LongValue>>> {
                                .setParallelism(parallelism)
                                .name("Degree count");
 
-               if (includeZeroDegreeVertices) {
-                       degree = input.getVertices()
+               if (includeZeroDegreeVertices.get()) {
+                       degree = input
+                               .getVertices()
                                .leftOuterJoin(degree)
                                .where(0)
                                .equalTo(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
index e7d78bb..f9cfae9 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
@@ -27,9 +27,10 @@ import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFir
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
@@ -46,15 +47,15 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class MaximumDegree<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
+extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> {
 
        // Required configuration
        private long maximumDegree;
 
        // Optional configuration
-       private boolean reduceOnTargetId = false;
+       private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, 
false);
 
-       private boolean broadcastHighDegreeVertices = false;
+       private OptionalBoolean broadcastHighDegreeVertices = new 
OptionalBoolean(false, false);
 
        private int parallelism = PARALLELISM_DEFAULT;
 
@@ -79,7 +80,7 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
         * @return this
         */
        public MaximumDegree<K, VV, EV> setReduceOnTargetId(boolean 
reduceOnTargetId) {
-               this.reduceOnTargetId = reduceOnTargetId;
+               this.reduceOnTargetId.set(reduceOnTargetId);
 
                return this;
        }
@@ -96,7 +97,7 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
         * @return this
         */
        public MaximumDegree<K, VV, EV> setBroadcastHighDegreeVertices(boolean 
broadcastHighDegreeVertices) {
-               this.broadcastHighDegreeVertices = broadcastHighDegreeVertices;
+               
this.broadcastHighDegreeVertices.set(broadcastHighDegreeVertices);
 
                return this;
        }
@@ -113,6 +114,36 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
                return this;
        }
 
+       @Override
+       protected String getAlgorithmName() {
+               return MaximumDegree.class.getName();
+       }
+
+       @Override
+       protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph 
other) {
+               Preconditions.checkNotNull(other);
+
+               if (! MaximumDegree.class.isAssignableFrom(other.getClass())) {
+                       return false;
+               }
+
+               MaximumDegree rhs = (MaximumDegree) other;
+
+               // verify that configurations can be merged
+
+               if (maximumDegree != rhs.maximumDegree) {
+                       return false;
+               }
+
+               // merge configurations
+
+               reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
+               
broadcastHighDegreeVertices.mergeWith(rhs.broadcastHighDegreeVertices);
+               parallelism = Math.min(parallelism, rhs.parallelism);
+
+               return true;
+       }
+
        /*
         * Implementation notes:
         *
@@ -121,12 +152,12 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
         */
 
        @Override
-       public Graph<K, VV, EV> run(Graph<K, VV, EV> input)
+       public Graph<K, VV, EV> runInternal(Graph<K, VV, EV> input)
                        throws Exception {
                // u, d(u)
                DataSet<Vertex<K, LongValue>> vertexDegree = input
                        .run(new VertexDegree<K, VV, EV>()
-                               .setReduceOnTargetId(reduceOnTargetId)
+                               .setReduceOnTargetId(reduceOnTargetId.get())
                                .setParallelism(parallelism));
 
                // u, d(u) if d(u) > maximumDegree
@@ -135,7 +166,7 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
                                .setParallelism(parallelism)
                                .name("Filter high-degree vertices");
 
-               JoinHint joinHint = broadcastHighDegreeVertices ? 
JoinHint.BROADCAST_HASH_SECOND : JoinHint.REPARTITION_HASH_SECOND;
+               JoinHint joinHint = broadcastHighDegreeVertices.get() ? 
JoinHint.BROADCAST_HASH_SECOND : JoinHint.REPARTITION_HASH_SECOND;
 
                // Vertices
                DataSet<Vertex<K, VV>> vertices = input
@@ -151,17 +182,17 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
                DataSet<Edge<K, EV>> edges = input
                        .getEdges()
                        .leftOuterJoin(highDegreeVertices, joinHint)
-                       .where(reduceOnTargetId ? 1 : 0)
+                       .where(reduceOnTargetId.get() ? 1 : 0)
                        .equalTo(0)
                                .with(new ProjectEdge<K, EV>())
                                .setParallelism(parallelism)
-                               .name("Project low-degree edges by " + 
(reduceOnTargetId ? "target" : "source"))
+                               .name("Project low-degree edges by " + 
(reduceOnTargetId.get() ? "target" : "source"))
                        .leftOuterJoin(highDegreeVertices, joinHint)
-                       .where(reduceOnTargetId ? 0 : 1)
+                       .where(reduceOnTargetId.get() ? 0 : 1)
                        .equalTo(0)
                        .with(new ProjectEdge<K, EV>())
                                .setParallelism(parallelism)
-                               .name("Project low-degree edges by " + 
(reduceOnTargetId ? "source" : "target"));
+                               .name("Project low-degree edges by " + 
(reduceOnTargetId.get() ? "source" : "target"));
 
                // Graph
                return Graph.fromDataSet(vertices, edges, input.getContext());

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
index 7362a3e..a7fa9b6 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.util.Preconditions;
 
@@ -38,7 +38,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class Simplify<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
+extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> {
 
        // Optional configuration
        private int parallelism = PARALLELISM_DEFAULT;
@@ -59,7 +59,27 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
        }
 
        @Override
-       public Graph<K, VV, EV> run(Graph<K, VV, EV> input)
+       protected String getAlgorithmName() {
+               return Simplify.class.getName();
+       }
+
+       @Override
+       protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph 
other) {
+               Preconditions.checkNotNull(other);
+
+               if (! Simplify.class.isAssignableFrom(other.getClass())) {
+                       return false;
+               }
+
+               Simplify rhs = (Simplify) other;
+
+               parallelism = Math.min(parallelism, rhs.parallelism);
+
+               return true;
+       }
+
+       @Override
+       public Graph<K, VV, EV> runInternal(Graph<K, VV, EV> input)
                        throws Exception {
                // Edges
                DataSet<Edge<K, EV>> edges = input

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
index 13ac470..d006756 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
@@ -40,7 +40,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class Simplify<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
+extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> {
 
        // Required configuration
        private boolean clipAndFlip;
@@ -77,7 +77,35 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
        }
 
        @Override
-       public Graph<K, VV, EV> run(Graph<K, VV, EV> input)
+       protected String getAlgorithmName() {
+               return Simplify.class.getName();
+       }
+
+       @Override
+       protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph 
other) {
+               Preconditions.checkNotNull(other);
+
+               if (! Simplify.class.isAssignableFrom(other.getClass())) {
+                       return false;
+               }
+
+               Simplify rhs = (Simplify) other;
+
+               // verify that configurations can be merged
+
+               if (clipAndFlip != rhs.clipAndFlip) {
+                       return false;
+               }
+
+               // merge configurations
+
+               parallelism = Math.min(parallelism, rhs.parallelism);
+
+               return true;
+       }
+
+       @Override
+       public Graph<K, VV, EV> runInternal(Graph<K, VV, EV> input)
                        throws Exception {
                // Edges
                DataSet<Edge<K, EV>> edges = input

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
index 47ec077..6003c9a 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
@@ -21,7 +21,7 @@ package org.apache.flink.graph.asm.translate;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
@@ -36,7 +36,7 @@ import static 
org.apache.flink.graph.asm.translate.Translate.translateEdgeValues
  * @param <NEW> new edge value type
  */
 public class TranslateEdgeValues<K, VV, OLD, NEW>
-implements GraphAlgorithm<K, VV, OLD, Graph<K, VV, NEW>> {
+extends GraphAlgorithmDelegatingGraph<K, VV, OLD, K, VV, NEW> {
 
        // Required configuration
        private TranslateFunction<OLD,NEW> translator;
@@ -71,7 +71,36 @@ implements GraphAlgorithm<K, VV, OLD, Graph<K, VV, NEW>> {
        }
 
        @Override
-       public Graph<K, VV, NEW> run(Graph<K, VV, OLD> input) throws Exception {
+       protected String getAlgorithmName() {
+               return TranslateEdgeValues.class.getName();
+       }
+
+       @Override
+       protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph 
other) {
+               Preconditions.checkNotNull(other);
+
+               if (! 
TranslateEdgeValues.class.isAssignableFrom(other.getClass())) {
+                       return false;
+               }
+
+               TranslateEdgeValues rhs = (TranslateEdgeValues) other;
+
+               // verify that configurations can be merged
+
+               if (translator != rhs.translator) {
+                       return false;
+               }
+
+               // merge configurations
+
+               parallelism = Math.min(parallelism, rhs.parallelism);
+
+               return true;
+       }
+
+       @Override
+       public Graph<K, VV, NEW> runInternal(Graph<K, VV, OLD> input)
+                       throws Exception {
                DataSet<Edge<K, NEW>> translatedEdges = 
translateEdgeValues(input.getEdges(), translator, parallelism);
 
                return Graph.fromDataSet(input.getVertices(), translatedEdges, 
input.getContext());

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
index 6a06feb..6ea56eb 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
@@ -21,8 +21,8 @@ package org.apache.flink.graph.asm.translate;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
@@ -38,7 +38,7 @@ import static 
org.apache.flink.graph.asm.translate.Translate.translateVertexIds;
  * @param <EV> edge value type
  */
 public class TranslateGraphIds<OLD, NEW, VV, EV>
-implements GraphAlgorithm<OLD, VV, EV, Graph<NEW, VV, EV>> {
+extends GraphAlgorithmDelegatingGraph<OLD, VV, EV, NEW, VV, EV> {
 
        // Required configuration
        private TranslateFunction<OLD,NEW> translator;
@@ -73,7 +73,36 @@ implements GraphAlgorithm<OLD, VV, EV, Graph<NEW, VV, EV>> {
        }
 
        @Override
-       public Graph<NEW, VV, EV> run(Graph<OLD, VV, EV> input) throws 
Exception {
+       protected String getAlgorithmName() {
+               return TranslateGraphIds.class.getName();
+       }
+
+       @Override
+       protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph 
other) {
+               Preconditions.checkNotNull(other);
+
+               if (! 
TranslateGraphIds.class.isAssignableFrom(other.getClass())) {
+                       return false;
+               }
+
+               TranslateGraphIds rhs = (TranslateGraphIds) other;
+
+               // verify that configurations can be merged
+
+               if (translator != rhs.translator) {
+                       return false;
+               }
+
+               // merge configurations
+
+               parallelism = Math.min(parallelism, rhs.parallelism);
+
+               return true;
+       }
+
+       @Override
+       public Graph<NEW, VV, EV> runInternal(Graph<OLD, VV, EV> input)
+                       throws Exception {
                // Vertices
                DataSet<Vertex<NEW, VV>> translatedVertices = 
translateVertexIds(input.getVertices(), translator, parallelism);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
index 3d0133a..3a49324 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
@@ -20,8 +20,8 @@ package org.apache.flink.graph.asm.translate;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
@@ -36,7 +36,7 @@ import static 
org.apache.flink.graph.asm.translate.Translate.translateVertexValu
  * @param <EV> edge value type
  */
 public class TranslateVertexValues<K, OLD, NEW, EV>
-implements GraphAlgorithm<K, OLD, EV, Graph<K, NEW, EV>> {
+extends GraphAlgorithmDelegatingGraph<K, OLD, EV, K, NEW, EV> {
 
        // Required configuration
        private TranslateFunction<OLD, NEW> translator;
@@ -71,7 +71,36 @@ implements GraphAlgorithm<K, OLD, EV, Graph<K, NEW, EV>> {
        }
 
        @Override
-       public Graph<K, NEW, EV> run(Graph<K, OLD, EV> input) throws Exception {
+       protected String getAlgorithmName() {
+               return TranslateVertexValues.class.getName();
+       }
+
+       @Override
+       protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph 
other) {
+               Preconditions.checkNotNull(other);
+
+               if (! 
TranslateVertexValues.class.isAssignableFrom(other.getClass())) {
+                       return false;
+               }
+
+               TranslateVertexValues rhs = (TranslateVertexValues) other;
+
+               // verify that configurations can be merged
+
+               if (translator != rhs.translator) {
+                       return false;
+               }
+
+               // merge configurations
+
+               parallelism = Math.min(parallelism, rhs.parallelism);
+
+               return true;
+       }
+
+       @Override
+       public Graph<K, NEW, EV> runInternal(Graph<K, OLD, EV> input)
+                       throws Exception {
                DataSet<Vertex<K, NEW>> translatedVertices = 
translateVertexValues(input.getVertices(), translator, parallelism);
 
                return Graph.fromDataSet(translatedVertices, input.getEdges(), 
input.getContext());

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
index bbc167e..537ad0f 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
@@ -25,12 +25,12 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
 import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 import 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
@@ -55,7 +55,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class LocalClusteringCoefficient<K extends Comparable<K> & 
CopyableValue<K>, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
 
        // Optional configuration
        private int littleParallelism = PARALLELISM_DEFAULT;
@@ -74,6 +74,25 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
 
                return this;
        }
+       @Override
+       protected String getAlgorithmName() {
+               return LocalClusteringCoefficient.class.getName();
+       }
+
+       @Override
+       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+               Preconditions.checkNotNull(other);
+
+               if (! 
LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) {
+                       return false;
+               }
+
+               LocalClusteringCoefficient rhs = (LocalClusteringCoefficient) 
other;
+
+               littleParallelism = Math.min(littleParallelism, 
rhs.littleParallelism);
+
+               return true;
+       }
 
        /*
         * Implementation notes:
@@ -86,12 +105,11 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
         */
 
        @Override
-       public DataSet<Result<K>> run(Graph<K, VV, EV> input)
+       public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
                        throws Exception {
                // u, v, w, bitmask
                DataSet<TriangleListing.Result<K>> triangles = input
                        .run(new TriangleListing<K,VV,EV>()
-                               .setSortTriangleVertices(false)
                                .setLittleParallelism(littleParallelism));
 
                // u, edge count

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
index 5c364f5..14c731a 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -33,9 +33,11 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeOrder;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair;
 import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import 
org.apache.flink.graph.library.clustering.directed.TriangleListing.Result;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.ByteValue;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.util.Collector;
@@ -60,10 +62,10 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, 
EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<TriangleListing.Result<K>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
 
        // Optional configuration
-       private boolean sortTriangleVertices = false;
+       private OptionalBoolean sortTriangleVertices = new 
OptionalBoolean(false, false);
 
        private int littleParallelism = PARALLELISM_DEFAULT;
 
@@ -75,7 +77,7 @@ implements GraphAlgorithm<K, VV, EV, 
DataSet<TriangleListing.Result<K>>> {
         * @return this
         */
        public TriangleListing<K, VV, EV> setSortTriangleVertices(boolean 
sortTriangleVertices) {
-               this.sortTriangleVertices = sortTriangleVertices;
+               this.sortTriangleVertices.set(sortTriangleVertices);
 
                return this;
        }
@@ -95,6 +97,27 @@ implements GraphAlgorithm<K, VV, EV, 
DataSet<TriangleListing.Result<K>>> {
                return this;
        }
 
+       @Override
+       protected String getAlgorithmName() {
+               return TriangleListing.class.getName();
+       }
+
+       @Override
+       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+               Preconditions.checkNotNull(other);
+
+               if (! TriangleListing.class.isAssignableFrom(other.getClass())) 
{
+                       return false;
+               }
+
+               TriangleListing rhs = (TriangleListing) other;
+
+               sortTriangleVertices.mergeWith(rhs.sortTriangleVertices);
+               littleParallelism = Math.min(littleParallelism, 
rhs.littleParallelism);
+
+               return true;
+       }
+
        /*
         * Implementation notes:
         *
@@ -106,7 +129,7 @@ implements GraphAlgorithm<K, VV, EV, 
DataSet<TriangleListing.Result<K>>> {
         */
 
        @Override
-       public DataSet<Result<K>> run(Graph<K, VV, EV> input)
+       public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
                        throws Exception {
                // u, v, bitmask where u < v
                DataSet<Tuple3<K, K, ByteValue>> filteredByID = input
@@ -151,7 +174,7 @@ implements GraphAlgorithm<K, VV, EV, 
DataSet<TriangleListing.Result<K>>> {
                                .setParallelism(littleParallelism)
                                .name("Triangle listing");
 
-               if (sortTriangleVertices) {
+               if (sortTriangleVertices.get()) {
                        triangles = triangles
                                .map(new SortTriangleVertices<K>())
                                        .name("Sort triangle vertices");

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index 6858818..8f707fd 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -26,11 +26,11 @@ import 
org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
 import 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
@@ -55,7 +55,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class LocalClusteringCoefficient<K extends Comparable<K> & 
CopyableValue<K>, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
 
        // Optional configuration
        private int littleParallelism = PARALLELISM_DEFAULT;
@@ -75,6 +75,26 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
                return this;
        }
 
+       @Override
+       protected String getAlgorithmName() {
+               return LocalClusteringCoefficient.class.getName();
+       }
+
+       @Override
+       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+               Preconditions.checkNotNull(other);
+
+               if (! 
LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) {
+                       return false;
+               }
+
+               LocalClusteringCoefficient rhs = (LocalClusteringCoefficient) 
other;
+
+               littleParallelism = Math.min(littleParallelism, 
rhs.littleParallelism);
+
+               return true;
+       }
+
        /*
         * Implementation notes:
         *
@@ -86,12 +106,11 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
         */
 
        @Override
-       public DataSet<Result<K>> run(Graph<K, VV, EV> input)
+       public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
                        throws Exception {
                // u, v, w
                DataSet<Tuple3<K,K,K>> triangles = input
                        .run(new TriangleListing<K,VV,EV>()
-                               .setSortTriangleVertices(false)
                                .setLittleParallelism(littleParallelism));
 
                // u, 1

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
index 4f8ce7a..89b86fe 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
@@ -32,8 +32,9 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeDegreePair;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
@@ -62,10 +63,10 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, 
EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Tuple3<K, K, K>> {
 
        // Optional configuration
-       private boolean sortTriangleVertices = false;
+       private OptionalBoolean sortTriangleVertices = new 
OptionalBoolean(false, false);
 
        private int littleParallelism = PARALLELISM_DEFAULT;
 
@@ -77,7 +78,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, 
K>>> {
         * @return this
         */
        public TriangleListing<K, VV, EV> setSortTriangleVertices(boolean 
sortTriangleVertices) {
-               this.sortTriangleVertices = sortTriangleVertices;
+               this.sortTriangleVertices.set(sortTriangleVertices);
 
                return this;
        }
@@ -97,6 +98,27 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, 
K>>> {
                return this;
        }
 
+       @Override
+       protected String getAlgorithmName() {
+               return TriangleListing.class.getName();
+       }
+
+       @Override
+       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+               Preconditions.checkNotNull(other);
+
+               if (! TriangleListing.class.isAssignableFrom(other.getClass())) 
{
+                       return false;
+               }
+
+               TriangleListing rhs = (TriangleListing) other;
+
+               sortTriangleVertices.mergeWith(rhs.sortTriangleVertices);
+               littleParallelism = Math.min(littleParallelism, 
rhs.littleParallelism);
+
+               return true;
+       }
+
        /*
         * Implementation notes:
         *
@@ -108,7 +130,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, 
K>>> {
         */
 
        @Override
-       public DataSet<Tuple3<K, K, K>> run(Graph<K, VV, EV> input)
+       public DataSet<Tuple3<K, K, K>> runInternal(Graph<K, VV, EV> input)
                        throws Exception {
                // u, v where u < v
                DataSet<Tuple2<K, K>> filteredByID = input
@@ -145,7 +167,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, 
K>>> {
                                .setParallelism(littleParallelism)
                                .name("Triangle listing");
 
-               if (sortTriangleVertices) {
+               if (sortTriangleVertices.get()) {
                        triangles = triangles
                                .map(new SortTriangleVertices<K>())
                                        .name("Sort triangle vertices");

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
index 00035e4..b88badb 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
@@ -36,9 +36,10 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.library.link_analysis.HITS.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
 import org.apache.flink.types.DoubleValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
@@ -62,7 +63,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class HITS<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<HITS.Result<K>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
 
        private static final String CHANGE_IN_SCORES = "change in scores";
 
@@ -128,7 +129,32 @@ implements GraphAlgorithm<K, VV, EV, 
DataSet<HITS.Result<K>>> {
        }
 
        @Override
-       public DataSet<Result<K>> run(Graph<K, VV, EV> input)
+       protected String getAlgorithmName() {
+               return HITS.class.getName();
+       }
+
+       @Override
+       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+               Preconditions.checkNotNull(other);
+
+               if (! HITS.class.isAssignableFrom(other.getClass())) {
+                       return false;
+               }
+
+               HITS rhs = (HITS) other;
+
+               // merge configurations
+
+               maxIterations = Math.max(maxIterations, rhs.maxIterations);
+               convergenceThreshold = Math.min(convergenceThreshold, 
rhs.convergenceThreshold);
+               parallelism = Math.min(parallelism, rhs.parallelism);
+
+               return true;
+       }
+
+
+       @Override
+       public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
                        throws Exception {
                DataSet<Tuple2<K, K>> edges = input
                        .getEdges()

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
index a164a5f..512a7a0 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
@@ -33,11 +33,11 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
 import org.apache.flink.graph.library.similarity.AdamicAdar.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.FloatValue;
 import org.apache.flink.types.IntValue;
@@ -71,7 +71,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class AdamicAdar<K extends CopyableValue<K>, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
 
        private static final int GROUP_SIZE = 64;
 
@@ -127,6 +127,35 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
                return this;
        }
 
+       @Override
+       protected String getAlgorithmName() {
+               return AdamicAdar.class.getName();
+       }
+
+       @Override
+       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+               Preconditions.checkNotNull(other);
+
+               if (! AdamicAdar.class.isAssignableFrom(other.getClass())) {
+                       return false;
+               }
+
+               AdamicAdar rhs = (AdamicAdar) other;
+
+               // verify that configurations can be merged
+
+               if (minimumRatio != rhs.minimumRatio ||
+                       minimumScore != rhs.minimumScore) {
+                       return false;
+               }
+
+               // merge configurations
+
+               littleParallelism = Math.min(littleParallelism, 
rhs.littleParallelism);
+
+               return true;
+       }
+
        /*
         * Implementation notes:
         *
@@ -136,7 +165,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
         */
 
        @Override
-       public DataSet<Result<K>> run(Graph<K, VV, EV> input)
+       public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
                        throws Exception {
                // s, d(s), 1/log(d(s))
                DataSet<Tuple3<K, LongValue, FloatValue>> inverseLogDegree = 
input

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
index c731984..7783e6b 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
@@ -28,10 +28,10 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
 import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
@@ -61,7 +61,7 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class JaccardIndex<K extends CopyableValue<K>, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
 
        public static final int DEFAULT_GROUP_SIZE = 64;
 
@@ -153,6 +153,39 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
                return this;
        }
 
+       @Override
+       protected String getAlgorithmName() {
+               return JaccardIndex.class.getName();
+       }
+
+       @Override
+       protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet 
other) {
+               Preconditions.checkNotNull(other);
+
+               if (! JaccardIndex.class.isAssignableFrom(other.getClass())) {
+                       return false;
+               }
+
+               JaccardIndex rhs = (JaccardIndex) other;
+
+               // verify that configurations can be merged
+
+               if (unboundedScores != rhs.unboundedScores ||
+                       minimumScoreNumerator != rhs.minimumScoreNumerator ||
+                       minimumScoreDenominator != rhs.minimumScoreDenominator 
||
+                       maximumScoreNumerator != rhs.maximumScoreNumerator ||
+                       maximumScoreDenominator != rhs.maximumScoreDenominator) 
{
+                       return false;
+               }
+
+               // merge configurations
+
+               groupSize = Math.max(groupSize, rhs.groupSize);
+               littleParallelism = Math.min(littleParallelism, 
rhs.littleParallelism);
+
+               return true;
+       }
+
        /*
         * Implementation notes:
         *
@@ -162,7 +195,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
         */
 
        @Override
-       public DataSet<Result<K>> run(Graph<K, VV, EV> input)
+       public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
                        throws Exception {
                // s, t, d(t)
                DataSet<Edge<K, Tuple2<EV, LongValue>>> neighborDegree = input

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java
new file mode 100644
index 0000000..164125c
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils.proxy;
+
+import javassist.util.proxy.MethodFilter;
+import javassist.util.proxy.MethodHandler;
+import javassist.util.proxy.ProxyFactory;
+import javassist.util.proxy.ProxyObject;
+import org.objenesis.ObjenesisStd;
+
+import java.lang.reflect.Method;
+
+/**
+ * Wraps an object with a proxy delegate whose method handler invokes all
+ * method calls on the wrapped object. This object can be later replaced.
+ *
+ * @param <X>
+ */
+public class Delegate<X> {
+       private X obj;
+
+       private X proxy = null;
+
+       /**
+        * Set the initial delegated object.
+        *
+        * @param obj delegated object
+        */
+       public Delegate(X obj) {
+               setObject(obj);
+       }
+
+       /**
+        * Change the delegated object.
+        *
+        * @param obj delegated object
+        */
+       public void setObject(X obj) {
+               this.obj = obj;
+       }
+
+       /**
+        * Instantiates and returns a proxy object which subclasses the
+        * delegated object. The proxy's method handler invokes all methods
+        * on the delegated object that is set at the time of invocation.
+        *
+        * @return delegating proxy
+        */
+       @SuppressWarnings("unchecked")
+       public X getProxy() {
+               if (proxy != null) {
+                       return proxy;
+               }
+
+               ProxyFactory factory = new ProxyFactory();
+               factory.setSuperclass(obj.getClass());
+
+               // create the class and instantiate an instance without calling 
a constructor
+               Class<? extends X> proxyClass = factory.createClass(new 
MethodFilter() {
+                       @Override
+                       public boolean isHandled(Method method) {
+                               return true;
+                       }
+               });
+               proxy = new ObjenesisStd().newInstance(proxyClass);
+
+               // create and set a handler to invoke all method calls on the 
delegated object
+               ((ProxyObject) proxy).setHandler(new MethodHandler() {
+                       @Override
+                       public Object invoke(Object self, Method thisMethod, 
Method proceed, Object[] args) throws Throwable {
+                               // method visibility may be restricted
+                               thisMethod.setAccessible(true);
+                               return thisMethod.invoke(obj, args);
+                       }
+               });
+
+               return proxy;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java
new file mode 100644
index 0000000..8e796e6
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils.proxy;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link GraphAlgorithm} transforms an input {@link Graph} into an output of
+ * type {@code T}. A {@code GraphAlgorithmDelegatingDataSet} wraps the 
resultant
+ * {@link DataSet} with a delegating proxy object. The delegated object can be
+ * replaced when the same algorithm is run on the same input with a mergeable
+ * configuration. This allows algorithms to be composed of implicitly reusable
+ * algorithms without publicly sharing intermediate {@link DataSet}s.
+ *
+ * @param <K> ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ * @param <T> output type
+ */
+public abstract class GraphAlgorithmDelegatingDataSet<K, VV, EV, T>
+implements GraphAlgorithm<K, VV, EV, DataSet<T>> {
+
+       // each algorithm and input pair may map to multiple configurations
+       private static Map<GraphAlgorithmDelegatingDataSet, 
List<GraphAlgorithmDelegatingDataSet>> cache =
+               Collections.synchronizedMap(new 
HashMap<GraphAlgorithmDelegatingDataSet, 
List<GraphAlgorithmDelegatingDataSet>>());
+
+       private Graph<K,VV,EV> input;
+
+       private Delegate<DataSet<T>> delegate;
+
+       /**
+        * Algorithms are identified by name rather than by class to allow 
subclassing.
+        *
+        * @return name of the algorithm, which may be shared by multiple 
classes
+        *               implementing the same algorithm and generating the 
same output
+        */
+       protected abstract String getAlgorithmName();
+
+       /**
+        * An algorithm must first test whether the configurations can be merged
+        * before merging individual fields.
+        *
+        * @param other the algorithm with which to compare and merge
+        * @return true if and only if configuration has been merged and the
+        *          algorithm's output can be reused
+        */
+       protected abstract boolean 
mergeConfiguration(GraphAlgorithmDelegatingDataSet other);
+
+       /**
+        * The implementation of the algorithm, renamed from {@link 
GraphAlgorithm#run(Graph)}.
+        *
+        * @param input the input graph
+        * @return the algorithm's output
+        * @throws Exception
+        */
+       protected abstract DataSet<T> runInternal(Graph<K, VV, EV> input) 
throws Exception;
+
+       @Override
+       public final int hashCode() {
+               return new HashCodeBuilder(17, 37)
+                       .append(input)
+                       .append(getAlgorithmName())
+                       .toHashCode();
+       }
+
+       @Override
+       public final boolean equals(Object obj) {
+               if (obj == null) {
+                       return false;
+               }
+
+               if (obj == this) {
+                       return true;
+               }
+
+               if (! 
GraphAlgorithmDelegatingDataSet.class.isAssignableFrom(obj.getClass())) {
+                       return false;
+               }
+
+               GraphAlgorithmDelegatingDataSet rhs = 
(GraphAlgorithmDelegatingDataSet) obj;
+
+               return new EqualsBuilder()
+                       .append(input, rhs.input)
+                       .append(getAlgorithmName(), rhs.getAlgorithmName())
+                       .isEquals();
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public final DataSet<T> run(Graph<K, VV, EV> input)
+                       throws Exception {
+               this.input = input;
+
+               if (cache.containsKey(this)) {
+                       for (GraphAlgorithmDelegatingDataSet<K, VV, EV, T> 
other : cache.get(this)) {
+                               if (mergeConfiguration(other)) {
+                                       // configuration has been merged so 
generate new output
+                                       DataSet<T> output = runInternal(input);
+
+                                       // update delegatee object and reuse 
delegate
+                                       other.delegate.setObject(output);
+                                       delegate = other.delegate;
+
+                                       return delegate.getProxy();
+                               }
+                       }
+               }
+
+               // no mergeable configuration found so generate new output
+               DataSet<T> output = runInternal(input);
+
+               // create a new delegate to wrap the algorithm output
+               delegate = new Delegate<>(output);
+
+               // cache this result
+               if (cache.containsKey(this)) {
+                       cache.get(this).add(this);
+               } else {
+                       cache.put(this, new 
ArrayList(Collections.singletonList(this)));
+               }
+
+               return delegate.getProxy();
+       }
+}

Reply via email to