http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java new file mode 100644 index 0000000..705510a --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.utils.proxy; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A {@link GraphAlgorithm} transforms an input {@link Graph} into an output of + * type {@code T}. A {@code GraphAlgorithmDelegatingGraph} wraps the resultant + * {@link Graph} with a delegating proxy object. The delegated object can be + * replaced when the same algorithm is run on the same input with a mergeable + * configuration. This allows algorithms to be composed of implicitly reusable + * algorithms without publicly sharing intermediate {@link DataSet}s. + * + * @param <IN_K> input ID type + * @param <IN_VV> input vertex value type + * @param <IN_EV> input edge value type + * @param <OUT_K> output ID type + * @param <OUT_VV> output vertex value type + * @param <OUT_EV> output edge value type + */ +public abstract class GraphAlgorithmDelegatingGraph<IN_K, IN_VV, IN_EV, OUT_K, OUT_VV, OUT_EV> +implements GraphAlgorithm<IN_K, IN_VV, IN_EV, Graph<OUT_K, OUT_VV, OUT_EV>> { + + // each algorithm and input pair may map to multiple configurations + private static Map<GraphAlgorithmDelegatingGraph, List<GraphAlgorithmDelegatingGraph>> cache = + Collections.synchronizedMap(new HashMap<GraphAlgorithmDelegatingGraph, List<GraphAlgorithmDelegatingGraph>>()); + + private Graph<IN_K, IN_VV, IN_EV> input; + + private Delegate<DataSet<Vertex<OUT_K, OUT_VV>>> verticesDelegate; + + private Delegate<DataSet<Edge<OUT_K, OUT_EV>>> edgesDelegate; + + /** + * Algorithms are identified by name rather than by class to allow subclassing. + * + * @return name of the algorithm, which may be shared by multiple classes + * implementing the same algorithm and generating the same output + */ + protected abstract String getAlgorithmName(); + + /** + * An algorithm must first test whether the configurations can be merged + * before merging individual fields. + * + * @param other the algorithm with which to compare and merge + * @return true if and only if configuration has been merged and the + * algorithm's output can be reused + */ + protected abstract boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other); + + /** + * The implementation of the algorithm, renamed from {@link GraphAlgorithm#run(Graph)}. + * + * @param input the input graph + * @return the algorithm's output + * @throws Exception + */ + protected abstract Graph<OUT_K, OUT_VV, OUT_EV> runInternal(Graph<IN_K, IN_VV, IN_EV> input) throws Exception; + + @Override + public final int hashCode() { + return new HashCodeBuilder(17, 37) + .append(input) + .append(getAlgorithmName()) + .toHashCode(); + } + + @Override + public final boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (obj == this) { + return true; + } + + if (! GraphAlgorithmDelegatingGraph.class.isAssignableFrom(obj.getClass())) { + return false; + } + + GraphAlgorithmDelegatingGraph rhs = (GraphAlgorithmDelegatingGraph) obj; + + return new EqualsBuilder() + .append(input, rhs.input) + .append(getAlgorithmName(), rhs.getAlgorithmName()) + .isEquals(); + } + + @Override + @SuppressWarnings("unchecked") + public final Graph<OUT_K, OUT_VV, OUT_EV> run(Graph<IN_K, IN_VV, IN_EV> input) + throws Exception { + this.input = input; + + if (cache.containsKey(this)) { + for (GraphAlgorithmDelegatingGraph<IN_K, IN_VV, IN_EV, OUT_K, OUT_VV, OUT_EV> other : cache.get(this)) { + if (mergeConfiguration(other)) { + // configuration has been merged so generate new output + Graph<OUT_K, OUT_VV, OUT_EV> output = runInternal(input); + + // update delegatee object and reuse delegate + other.verticesDelegate.setObject(output.getVertices()); + verticesDelegate = other.verticesDelegate; + + other.edgesDelegate.setObject(output.getEdges()); + edgesDelegate = other.edgesDelegate; + + return Graph.fromDataSet(verticesDelegate.getProxy(), edgesDelegate.getProxy(), output.getContext()); + } + } + } + + // no mergeable configuration found so generate new output + Graph<OUT_K, OUT_VV, OUT_EV> output = runInternal(input); + + // create a new delegate to wrap the algorithm output + verticesDelegate = new Delegate<>(output.getVertices()); + edgesDelegate = new Delegate<>(output.getEdges()); + + // cache this result + if (cache.containsKey(this)) { + cache.get(this).add(this); + } else { + cache.put(this, new ArrayList(Collections.singletonList(this))); + } + + return Graph.fromDataSet(verticesDelegate.getProxy(), edgesDelegate.getProxy(), output.getContext()); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/OptionalBoolean.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/OptionalBoolean.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/OptionalBoolean.java new file mode 100644 index 0000000..70a1294 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/OptionalBoolean.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.utils.proxy; + +import org.apache.flink.graph.GraphAlgorithm; + +/** + * A multi-state boolean. + * <br/> + * This class is used by {@link GraphAlgorithm} configuration options to set a + * default value which can be overwritten. The default value is also used when + * algorithm configurations are merged and conflict. + */ +public class OptionalBoolean { + + protected enum State { + UNSET, + FALSE, + TRUE, + CONFLICTING + } + + private State state = State.UNSET; + + private final boolean valueIfUnset; + + private final boolean valueIfConflicting; + + /** + * An {@code OptionalBoolean} has three possible states: true, false, and + * "unset". The value is set when merged with a value of true or false. The + * state returns to unset either explicitly or when true is merged with false. + * + * @param valueIfUnset the value to return when the object's state is unset + * @param valueIfConflicting the value to return when the object's state is conflicting + */ + public OptionalBoolean(boolean valueIfUnset, boolean valueIfConflicting) { + this.valueIfUnset = valueIfUnset; + this.valueIfConflicting = valueIfConflicting; + } + + /** + * Get the boolean state. + * + * @return boolean state + */ + public boolean get() { + switch (state) { + case UNSET: + return valueIfUnset; + case FALSE: + return false; + case TRUE: + return true; + case CONFLICTING: + return valueIfConflicting; + default: + throw new RuntimeException("Unknown state"); + } + } + + /** + * Set the boolean state. + * + * @param value boolean state + */ + public void set(boolean value) { + this.state = (value ? State.TRUE : State.FALSE); + } + + /** + * Reset to the unset state. + */ + public void unset() { + this.state = State.UNSET; + } + + /** + * Get the actual state. + * + * @return actual state + */ + protected State getState() { + return state; + } + + /** + * The conflicting states are true with false and false with true. + * + * @param other object to test with + * @return whether the objects conflict + */ + public boolean conflictsWith(OptionalBoolean other) { + return state == State.CONFLICTING + || other.state == State.CONFLICTING + || (state == State.TRUE && other.state == State.FALSE) + || (state == State.FALSE && other.state == State.TRUE); + } + + /** + * State transitions: + * if the states are the same then no change + * if either state is unset then change to the other state + * if the states are conflicting then set to the conflicting state + * + * @param other object from which to merge state + */ + public void mergeWith(OptionalBoolean other) { + if (state == other.state) { + // no change in state + } else if (state == State.UNSET) { + state = other.state; + } else if (other.state == State.UNSET) { + // no change in state + } else { + state = State.CONFLICTING; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java index 307ff4c..37e1bb9 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java @@ -52,7 +52,7 @@ extends AsmTestBase { DataSet<Vertex<IntValue, LongValue>> targetDegrees = undirectedSimpleGraph .run(new VertexDegree<IntValue, NullValue, NullValue>() - .setReduceOnTargetId(true)); + .setReduceOnTargetId(true)); TestBaseUtils.compareResultAsText(targetDegrees.collect(), expectedResult); } http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/utils/proxy/OptionalBooleanTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/utils/proxy/OptionalBooleanTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/utils/proxy/OptionalBooleanTest.java new file mode 100644 index 0000000..a771f1f --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/utils/proxy/OptionalBooleanTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.utils.proxy; + +import org.apache.flink.graph.utils.proxy.OptionalBoolean.State; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +public class OptionalBooleanTest { + + private OptionalBoolean u; + private OptionalBoolean f; + private OptionalBoolean t; + private OptionalBoolean c; + + @Before + public void setup() { + u = new OptionalBoolean(false, true); + f = new OptionalBoolean(false, true); + t = new OptionalBoolean(false, true); + c = new OptionalBoolean(false, true); + + f.set(false); + t.set(true); + + c.set(true); + c.mergeWith(f); + } + + @Test + public void testIsMismatchedWith() + throws Exception { + // unset, unset + assertFalse(u.conflictsWith(u)); + + // unset, false + assertFalse(u.conflictsWith(f)); + + // unset, true + assertFalse(u.conflictsWith(t)); + + // unset, conflicting + assertTrue(u.conflictsWith(c)); + + + // false, unset + assertFalse(f.conflictsWith(u)); + + // false, false + assertFalse(f.conflictsWith(f)); + + // false, true + assertTrue(f.conflictsWith(t)); + + // false, conflicting + assertTrue(f.conflictsWith(c)); + + + // true, unset + assertFalse(t.conflictsWith(u)); + + // true, false + assertTrue(t.conflictsWith(f)); + + // true, true + assertFalse(t.conflictsWith(t)); + + // true, conflicting + assertTrue(t.conflictsWith(c)); + + + // conflicting, unset + assertTrue(c.conflictsWith(u)); + + // conflicting, false + assertTrue(c.conflictsWith(f)); + + // conflicting, true + assertTrue(c.conflictsWith(t)); + + // conflicting, conflicting + assertTrue(c.conflictsWith(c)); + } + + @Test + public void testMergeWith() + throws Exception { + // unset, unset => unset + u.mergeWith(u); + assertEquals(State.UNSET, u.getState()); + + // unset, false => false + u.mergeWith(f); + assertEquals(State.FALSE, u.getState()); + u.unset(); + + // unset, true => true + u.mergeWith(t); + assertEquals(State.TRUE, u.getState()); + u.unset(); + + // unset, conflicting => conflicting + u.mergeWith(c); + assertEquals(State.CONFLICTING, u.getState()); + u.unset(); + + + // false, unset => false + f.mergeWith(u); + assertEquals(State.FALSE, f.getState()); + + // false, false => false + f.mergeWith(f); + assertEquals(State.FALSE, f.getState()); + + // false, true => conflicting + f.mergeWith(t); + assertEquals(State.CONFLICTING, f.getState()); + f.set(false); + + // false, conflicting => conflicting + f.mergeWith(c); + assertEquals(State.CONFLICTING, f.getState()); + f.set(false); + + + // true, unset => true + t.mergeWith(u); + assertEquals(State.TRUE, t.getState()); + + // true, false => conflicting + t.mergeWith(f); + assertEquals(State.CONFLICTING, t.getState()); + t.set(true); + + // true, true => true + t.mergeWith(t); + assertEquals(State.TRUE, t.getState()); + + // true, conflicting => conflicting + t.mergeWith(c); + assertEquals(State.CONFLICTING, t.getState()); + t.set(true); + + + // conflicting, unset => conflicting + c.mergeWith(u); + assertEquals(State.CONFLICTING, c.getState()); + + // conflicting, false => conflicting + c.mergeWith(f); + assertEquals(State.CONFLICTING, c.getState()); + + // conflicting, true => conflicting + c.mergeWith(t); + assertEquals(State.CONFLICTING, c.getState()); + + // conflicting, conflicting => conflicting + c.mergeWith(c); + assertEquals(State.CONFLICTING, c.getState()); + } +}
