http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java
new file mode 100644
index 0000000..705510a
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils.proxy;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link GraphAlgorithm} transforms an input {@link Graph} into an output of
+ * type {@code T}. A {@code GraphAlgorithmDelegatingGraph} wraps the resultant
+ * {@link Graph} with a delegating proxy object. The delegated object can be
+ * replaced when the same algorithm is run on the same input with a mergeable
+ * configuration. This allows algorithms to be composed of implicitly reusable
+ * algorithms without publicly sharing intermediate {@link DataSet}s.
+ *
+ * @param <IN_K> input ID type
+ * @param <IN_VV> input vertex value type
+ * @param <IN_EV> input edge value type
+ * @param <OUT_K> output ID type
+ * @param <OUT_VV> output vertex value type
+ * @param <OUT_EV> output edge value type
+ */
+public abstract class GraphAlgorithmDelegatingGraph<IN_K, IN_VV, IN_EV, OUT_K, 
OUT_VV, OUT_EV>
+implements GraphAlgorithm<IN_K, IN_VV, IN_EV, Graph<OUT_K, OUT_VV, OUT_EV>> {
+
+       // each algorithm and input pair may map to multiple configurations
+       private static Map<GraphAlgorithmDelegatingGraph, 
List<GraphAlgorithmDelegatingGraph>> cache =
+               Collections.synchronizedMap(new 
HashMap<GraphAlgorithmDelegatingGraph, List<GraphAlgorithmDelegatingGraph>>());
+
+       private Graph<IN_K, IN_VV, IN_EV> input;
+
+       private Delegate<DataSet<Vertex<OUT_K, OUT_VV>>> verticesDelegate;
+
+       private Delegate<DataSet<Edge<OUT_K, OUT_EV>>> edgesDelegate;
+
+       /**
+        * Algorithms are identified by name rather than by class to allow 
subclassing.
+        *
+        * @return name of the algorithm, which may be shared by multiple 
classes
+        *               implementing the same algorithm and generating the 
same output
+        */
+       protected abstract String getAlgorithmName();
+
+       /**
+        * An algorithm must first test whether the configurations can be merged
+        * before merging individual fields.
+        *
+        * @param other the algorithm with which to compare and merge
+        * @return true if and only if configuration has been merged and the
+        *          algorithm's output can be reused
+        */
+       protected abstract boolean 
mergeConfiguration(GraphAlgorithmDelegatingGraph other);
+
+       /**
+        * The implementation of the algorithm, renamed from {@link 
GraphAlgorithm#run(Graph)}.
+        *
+        * @param input the input graph
+        * @return the algorithm's output
+        * @throws Exception
+        */
+       protected abstract Graph<OUT_K, OUT_VV, OUT_EV> runInternal(Graph<IN_K, 
IN_VV, IN_EV> input) throws Exception;
+
+       @Override
+       public final int hashCode() {
+               return new HashCodeBuilder(17, 37)
+                       .append(input)
+                       .append(getAlgorithmName())
+                       .toHashCode();
+       }
+
+       @Override
+       public final boolean equals(Object obj) {
+               if (obj == null) {
+                       return false;
+               }
+
+               if (obj == this) {
+                       return true;
+               }
+
+               if (! 
GraphAlgorithmDelegatingGraph.class.isAssignableFrom(obj.getClass())) {
+                       return false;
+               }
+
+               GraphAlgorithmDelegatingGraph rhs = 
(GraphAlgorithmDelegatingGraph) obj;
+
+               return new EqualsBuilder()
+                       .append(input, rhs.input)
+                       .append(getAlgorithmName(), rhs.getAlgorithmName())
+                       .isEquals();
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public final Graph<OUT_K, OUT_VV, OUT_EV> run(Graph<IN_K, IN_VV, IN_EV> 
input)
+                       throws Exception {
+               this.input = input;
+
+               if (cache.containsKey(this)) {
+                       for (GraphAlgorithmDelegatingGraph<IN_K, IN_VV, IN_EV, 
OUT_K, OUT_VV, OUT_EV> other : cache.get(this)) {
+                               if (mergeConfiguration(other)) {
+                                       // configuration has been merged so 
generate new output
+                                       Graph<OUT_K, OUT_VV, OUT_EV> output = 
runInternal(input);
+
+                                       // update delegatee object and reuse 
delegate
+                                       
other.verticesDelegate.setObject(output.getVertices());
+                                       verticesDelegate = 
other.verticesDelegate;
+
+                                       
other.edgesDelegate.setObject(output.getEdges());
+                                       edgesDelegate = other.edgesDelegate;
+
+                                       return 
Graph.fromDataSet(verticesDelegate.getProxy(), edgesDelegate.getProxy(), 
output.getContext());
+                               }
+                       }
+               }
+
+               // no mergeable configuration found so generate new output
+               Graph<OUT_K, OUT_VV, OUT_EV> output = runInternal(input);
+
+               // create a new delegate to wrap the algorithm output
+               verticesDelegate = new Delegate<>(output.getVertices());
+               edgesDelegate = new Delegate<>(output.getEdges());
+
+               // cache this result
+               if (cache.containsKey(this)) {
+                       cache.get(this).add(this);
+               } else {
+                       cache.put(this, new 
ArrayList(Collections.singletonList(this)));
+               }
+
+               return Graph.fromDataSet(verticesDelegate.getProxy(), 
edgesDelegate.getProxy(), output.getContext());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/OptionalBoolean.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/OptionalBoolean.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/OptionalBoolean.java
new file mode 100644
index 0000000..70a1294
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/OptionalBoolean.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils.proxy;
+
+import org.apache.flink.graph.GraphAlgorithm;
+
+/**
+ * A multi-state boolean.
+ * <br/>
+ * This class is used by {@link GraphAlgorithm} configuration options to set a
+ * default value which can be overwritten. The default value is also used when
+ * algorithm configurations are merged and conflict.
+ */
+public class OptionalBoolean {
+
+       protected enum State {
+               UNSET,
+               FALSE,
+               TRUE,
+               CONFLICTING
+       }
+
+       private State state = State.UNSET;
+
+       private final boolean valueIfUnset;
+
+       private final boolean valueIfConflicting;
+
+       /**
+        * An {@code OptionalBoolean} has three possible states: true, false, 
and
+        * "unset". The value is set when merged with a value of true or false. 
The
+        * state returns to unset either explicitly or when true is merged with 
false.
+        *
+        * @param valueIfUnset the value to return when the object's state is 
unset
+        * @param valueIfConflicting the value to return when the object's 
state is conflicting
+        */
+       public OptionalBoolean(boolean valueIfUnset, boolean 
valueIfConflicting) {
+               this.valueIfUnset = valueIfUnset;
+               this.valueIfConflicting = valueIfConflicting;
+       }
+
+       /**
+        * Get the boolean state.
+        *
+        * @return boolean state
+        */
+       public boolean get() {
+               switch (state) {
+                       case UNSET:
+                               return valueIfUnset;
+                       case FALSE:
+                               return false;
+                       case TRUE:
+                               return true;
+                       case CONFLICTING:
+                               return valueIfConflicting;
+                       default:
+                               throw new RuntimeException("Unknown state");
+               }
+       }
+
+       /**
+        * Set the boolean state.
+        *
+        * @param value boolean state
+        */
+       public void set(boolean value) {
+               this.state = (value ? State.TRUE : State.FALSE);
+       }
+
+       /**
+        * Reset to the unset state.
+        */
+       public void unset() {
+               this.state = State.UNSET;
+       }
+
+       /**
+        * Get the actual state.
+        *
+        * @return actual state
+        */
+       protected State getState() {
+               return state;
+       }
+
+       /**
+        * The conflicting states are true with false and false with true.
+        *
+        * @param other object to test with
+        * @return whether the objects conflict
+        */
+       public boolean conflictsWith(OptionalBoolean other) {
+               return state == State.CONFLICTING
+                       || other.state == State.CONFLICTING
+                       || (state == State.TRUE && other.state == State.FALSE)
+                       || (state == State.FALSE && other.state == State.TRUE);
+       }
+
+       /**
+        * State transitions:
+        *  if the states are the same then no change
+        *  if either state is unset then change to the other state
+        *  if the states are conflicting then set to the conflicting state
+        *
+        * @param other object from which to merge state
+        */
+       public void mergeWith(OptionalBoolean other) {
+               if (state == other.state) {
+                       // no change in state
+               } else if (state == State.UNSET) {
+                       state = other.state;
+               } else if (other.state == State.UNSET) {
+                       // no change in state
+               } else {
+                       state = State.CONFLICTING;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
index 307ff4c..37e1bb9 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
@@ -52,7 +52,7 @@ extends AsmTestBase {
 
                DataSet<Vertex<IntValue, LongValue>> targetDegrees = 
undirectedSimpleGraph
                        .run(new VertexDegree<IntValue, NullValue, NullValue>()
-                       .setReduceOnTargetId(true));
+                               .setReduceOnTargetId(true));
 
                TestBaseUtils.compareResultAsText(targetDegrees.collect(), 
expectedResult);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/utils/proxy/OptionalBooleanTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/utils/proxy/OptionalBooleanTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/utils/proxy/OptionalBooleanTest.java
new file mode 100644
index 0000000..a771f1f
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/utils/proxy/OptionalBooleanTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils.proxy;
+
+import org.apache.flink.graph.utils.proxy.OptionalBoolean.State;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+public class OptionalBooleanTest {
+
+       private OptionalBoolean u;
+       private OptionalBoolean f;
+       private OptionalBoolean t;
+       private OptionalBoolean c;
+
+       @Before
+       public void setup() {
+               u = new OptionalBoolean(false, true);
+               f = new OptionalBoolean(false, true);
+               t = new OptionalBoolean(false, true);
+               c = new OptionalBoolean(false, true);
+
+               f.set(false);
+               t.set(true);
+
+               c.set(true);
+               c.mergeWith(f);
+       }
+
+       @Test
+       public void testIsMismatchedWith()
+                       throws Exception {
+               // unset, unset
+               assertFalse(u.conflictsWith(u));
+
+               // unset, false
+               assertFalse(u.conflictsWith(f));
+
+               // unset, true
+               assertFalse(u.conflictsWith(t));
+
+               // unset, conflicting
+               assertTrue(u.conflictsWith(c));
+
+
+               // false, unset
+               assertFalse(f.conflictsWith(u));
+
+               // false, false
+               assertFalse(f.conflictsWith(f));
+
+               // false, true
+               assertTrue(f.conflictsWith(t));
+
+               // false, conflicting
+               assertTrue(f.conflictsWith(c));
+
+
+               // true, unset
+               assertFalse(t.conflictsWith(u));
+
+               // true, false
+               assertTrue(t.conflictsWith(f));
+
+               // true, true
+               assertFalse(t.conflictsWith(t));
+
+               // true, conflicting
+               assertTrue(t.conflictsWith(c));
+
+
+               // conflicting, unset
+               assertTrue(c.conflictsWith(u));
+
+               // conflicting, false
+               assertTrue(c.conflictsWith(f));
+
+               // conflicting, true
+               assertTrue(c.conflictsWith(t));
+
+               // conflicting, conflicting
+               assertTrue(c.conflictsWith(c));
+       }
+
+       @Test
+       public void testMergeWith()
+                       throws Exception {
+               // unset, unset => unset
+               u.mergeWith(u);
+               assertEquals(State.UNSET, u.getState());
+
+               // unset, false => false
+               u.mergeWith(f);
+               assertEquals(State.FALSE, u.getState());
+               u.unset();
+
+               // unset, true => true
+               u.mergeWith(t);
+               assertEquals(State.TRUE, u.getState());
+               u.unset();
+
+               // unset, conflicting => conflicting
+               u.mergeWith(c);
+               assertEquals(State.CONFLICTING, u.getState());
+               u.unset();
+
+
+               // false, unset => false
+               f.mergeWith(u);
+               assertEquals(State.FALSE, f.getState());
+
+               // false, false => false
+               f.mergeWith(f);
+               assertEquals(State.FALSE, f.getState());
+
+               // false, true => conflicting
+               f.mergeWith(t);
+               assertEquals(State.CONFLICTING, f.getState());
+               f.set(false);
+
+               // false, conflicting => conflicting
+               f.mergeWith(c);
+               assertEquals(State.CONFLICTING, f.getState());
+               f.set(false);
+
+
+               // true, unset => true
+               t.mergeWith(u);
+               assertEquals(State.TRUE, t.getState());
+
+               // true, false => conflicting
+               t.mergeWith(f);
+               assertEquals(State.CONFLICTING, t.getState());
+               t.set(true);
+
+               // true, true => true
+               t.mergeWith(t);
+               assertEquals(State.TRUE, t.getState());
+
+               // true, conflicting => conflicting
+               t.mergeWith(c);
+               assertEquals(State.CONFLICTING, t.getState());
+               t.set(true);
+
+
+               // conflicting, unset => conflicting
+               c.mergeWith(u);
+               assertEquals(State.CONFLICTING, c.getState());
+
+               // conflicting, false => conflicting
+               c.mergeWith(f);
+               assertEquals(State.CONFLICTING, c.getState());
+
+               // conflicting, true => conflicting
+               c.mergeWith(t);
+               assertEquals(State.CONFLICTING, c.getState());
+
+               // conflicting, conflicting => conflicting
+               c.mergeWith(c);
+               assertEquals(State.CONFLICTING, c.getState());
+       }
+}

Reply via email to