[FLINK-6393] [gelly] Add Circulant and Echo graph generators This closes #3802
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ee8c69a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ee8c69a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ee8c69a Branch: refs/heads/master Commit: 3ee8c69aa4390a8d51b33f262f719fb1a5474d51 Parents: 54c8826 Author: FlorianFan <[email protected]> Authored: Thu Apr 27 20:41:53 2017 +0800 Committer: Greg Hogan <[email protected]> Committed: Thu May 11 12:42:25 2017 -0400 ---------------------------------------------------------------------- docs/dev/libs/gelly/graph_generators.md | 91 ++++++- .../java/org/apache/flink/graph/Runner.java | 11 +- .../graph/drivers/input/CirculantGraph.java | 129 ++++++++++ .../flink/graph/drivers/input/EchoGraph.java | 67 +++++ .../flink/graph/drivers/input/GridGraph.java | 3 +- .../flink/graph/drivers/EdgeListITCase.java | 90 ++++++- .../flink/graph/generator/CirculantGraph.java | 246 +++++++++++++++++++ .../flink/graph/generator/CompleteGraph.java | 57 +---- .../apache/flink/graph/generator/EchoGraph.java | 82 +++++++ .../apache/flink/graph/generator/GridGraph.java | 4 +- .../graph/generator/CirculantGraphTest.java | 88 +++++++ .../flink/graph/generator/EchoGraphTest.java | 128 ++++++++++ 12 files changed, 926 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/docs/dev/libs/gelly/graph_generators.md ---------------------------------------------------------------------- diff --git a/docs/dev/libs/gelly/graph_generators.md b/docs/dev/libs/gelly/graph_generators.md index 2532ee4..d4ad229 100644 --- a/docs/dev/libs/gelly/graph_generators.md +++ b/docs/dev/libs/gelly/graph_generators.md @@ -72,6 +72,42 @@ val graph = new GridGraph(env.getJavaEnv).addDimension(2, wrapEndpoints).addDime </div> </div> +## Circulant Graph + +A [circulant graph](http://mathworld.wolfram.com/CirculantGraph.html) is an +[oriented graph](http://mathworld.wolfram.com/OrientedGraph.html) configured +with one or more contiguous ranges of offsets. Edges connect integer vertex IDs +whose difference equals a configured offset. The circulant graph with no offsets +is the [empty graph](#empty-graph) and the graph with the maximum range is the +[complete graph](#complete-graph). + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +long vertexCount = 5; + +Graph<LongValue, NullValue, NullValue> graph = new CirculantGraph(env, vertexCount) + .addRange(1, 2) + .generate(); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +import org.apache.flink.api.scala._ +import org.apache.flink.graph.generator.CirculantGraph + +val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + +val vertexCount = 5 + +val graph = new CirculantGraph(env.getJavaEnv, vertexCount).addRange(1, 2).generate() +{% endhighlight %} +</div> +</div> + ## Complete Graph An undirected graph connecting every distinct pair of vertices. @@ -83,7 +119,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); long vertexCount = 5; -Graph<LongValue,NullValue,NullValue> graph = new CompleteGraph(env, vertexCount) +Graph<LongValue, NullValue, NullValue> graph = new CompleteGraph(env, vertexCount) .generate(); {% endhighlight %} </div> @@ -148,7 +184,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); long vertexCount = 5; -Graph<LongValue,NullValue,NullValue> graph = new CycleGraph(env, vertexCount) +Graph<LongValue, NullValue, NullValue> graph = new CycleGraph(env, vertexCount) .generate(); {% endhighlight %} </div> @@ -193,6 +229,41 @@ val graph = new CycleGraph(env.getJavaEnv, vertexCount).generate() <text x="51" y="199">4</text> </svg> +## Echo Graph + +An [echo graph](http://mathworld.wolfram.com/EchoGraph.html) is a +[circulant graph](#circulant-graph) with `n` vertices defined by the width of a +single range of offsets centered at `n/2`. A vertex is connected to 'far' +vertices, which connect to 'near' vertices, which connect to 'far' vertices, .... + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +long vertexCount = 5; +long vertexDegree = 2; + +Graph<LongValue, NullValue, NullValue> graph = new EchoGraph(env, vertexCount, vertexDegree) + .generate(); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +import org.apache.flink.api.scala._ +import org.apache.flink.graph.generator.EchoGraph + +val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + +val vertexCount = 5 +val vertexDegree = 2 + +val graph = new EchoGraph(env.getJavaEnv, vertexCount, vertexDegree).generate() +{% endhighlight %} +</div> +</div> + ## Empty Graph A graph containing no edges. @@ -204,7 +275,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); long vertexCount = 5; -Graph<LongValue,NullValue,NullValue> graph = new EmptyGraph(env, vertexCount) +Graph<LongValue, NullValue, NullValue> graph = new EmptyGraph(env, vertexCount) .generate(); {% endhighlight %} </div> @@ -257,7 +328,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); boolean wrapEndpoints = false; -Graph<LongValue,NullValue,NullValue> graph = new GridGraph(env) +Graph<LongValue, NullValue, NullValue> graph = new GridGraph(env) .addDimension(2, wrapEndpoints) .addDimension(4, wrapEndpoints) .generate(); @@ -327,7 +398,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); long dimensions = 3; -Graph<LongValue,NullValue,NullValue> graph = new HypercubeGraph(env, dimensions) +Graph<LongValue, NullValue, NullValue> graph = new HypercubeGraph(env, dimensions) .generate(); {% endhighlight %} </div> @@ -403,7 +474,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); long vertexCount = 5 -Graph<LongValue,NullValue,NullValue> graph = new PathGraph(env, vertexCount) +Graph<LongValue, NullValue, NullValue> graph = new PathGraph(env, vertexCount) .generate(); {% endhighlight %} </div> @@ -464,7 +535,7 @@ RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory() int vertexCount = 1 << scale; int edgeCount = edgeFactor * vertexCount; -Graph<LongValue,NullValue,NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) +Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) .generate(); {% endhighlight %} </div> @@ -505,7 +576,7 @@ int edgeCount = edgeFactor * vertexCount; boolean clipAndFlip = false; -Graph<LongValue,NullValue,NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) +Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) .setConstants(0.57f, 0.19f, 0.19f) .setNoise(true, 0.10f) .generate(); @@ -542,7 +613,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); long vertexPairCount = 4 // note: configured with the number of vertex pairs -Graph<LongValue,NullValue,NullValue> graph = new SingletonEdgeGraph(env, vertexPairCount) +Graph<LongValue, NullValue, NullValue> graph = new SingletonEdgeGraph(env, vertexPairCount) .generate(); {% endhighlight %} </div> @@ -607,7 +678,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); long vertexCount = 6; -Graph<LongValue,NullValue,NullValue> graph = new StarGraph(env, vertexCount) +Graph<LongValue, NullValue, NullValue> graph = new StarGraph(env, vertexCount) .generate(); {% endhighlight %} </div> http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java index 5ffe681..07cad1f 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java @@ -35,8 +35,10 @@ import org.apache.flink.graph.drivers.HITS; import org.apache.flink.graph.drivers.JaccardIndex; import org.apache.flink.graph.drivers.PageRank; import org.apache.flink.graph.drivers.TriangleListing; +import org.apache.flink.graph.drivers.input.CirculantGraph; import org.apache.flink.graph.drivers.input.CompleteGraph; import org.apache.flink.graph.drivers.input.CycleGraph; +import org.apache.flink.graph.drivers.input.EchoGraph; import org.apache.flink.graph.drivers.input.EmptyGraph; import org.apache.flink.graph.drivers.input.GridGraph; import org.apache.flink.graph.drivers.input.HypercubeGraph; @@ -76,9 +78,11 @@ public class Runner { private static final String OUTPUT = "output"; private static ParameterizedFactory<Input> inputFactory = new ParameterizedFactory<Input>() + .addClass(CirculantGraph.class) .addClass(CompleteGraph.class) .addClass(org.apache.flink.graph.drivers.input.CSV.class) .addClass(CycleGraph.class) + .addClass(EchoGraph.class) .addClass(EmptyGraph.class) .addClass(GridGraph.class) .addClass(HypercubeGraph.class) @@ -236,7 +240,12 @@ public class Runner { // Input - input.configure(parameters); + try { + input.configure(parameters); + } catch (RuntimeException ex) { + throw new ProgramParametrizationException(ex.getMessage()); + } + Graph graph = input.create(env); // Algorithm http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java new file mode 100644 index 0000000..14ee816 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers.input; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.client.program.ProgramParametrizationException; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.drivers.parameter.LongParameter; +import org.apache.flink.graph.generator.CirculantGraph.OffsetRange; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.graph.generator.CirculantGraph.MINIMUM_VERTEX_COUNT; + +/** + * Generate a {@link org.apache.flink.graph.generator.CirculantGraph}. + */ +public class CirculantGraph +extends GeneratedGraph<LongValue> { + + private static final String PREFIX = "range"; + + private LongParameter vertexCount = new LongParameter(this, "vertex_count") + .setMinimumValue(MINIMUM_VERTEX_COUNT); + + private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") + .setDefaultValue(PARALLELISM_DEFAULT); + + private List<OffsetRange> offsetRanges = new ArrayList<>(); + + @Override + public String getName() { + return CirculantGraph.class.getSimpleName(); + } + + @Override + public String getUsage() { + return "--" + PREFIX + "0 offset:length [--" + PREFIX + "1 offset:length [--" + PREFIX + "2 ...]]" + + super.getUsage(); + } + + @Override + public void configure(ParameterTool parameterTool) throws ProgramParametrizationException { + super.configure(parameterTool); + + // add offset ranges as ordered by offset ID (range0, range1, range2, ...) + + Map<Integer, String> offsetRangeMap = new TreeMap<>(); + + // first parse all offset ranges into a sorted map + for (String key : parameterTool.toMap().keySet()) { + if (key.startsWith(PREFIX)) { + int offsetId = Integer.parseInt(key.substring(PREFIX.length())); + offsetRangeMap.put(offsetId, parameterTool.get(key)); + } + } + + // then store offset ranges in order + for (String field : offsetRangeMap.values()) { + ProgramParametrizationException exception = new ProgramParametrizationException("Circulant offset range" + + " must use a colon to separate the integer offset and integer length:" + field + "'"); + + if (! field.contains(":")) { + throw exception; + } + + String[] parts = field.split(":"); + + if (parts.length != 2) { + throw exception; + } + + try { + long offset = Long.parseLong(parts[0]); + long length = Long.parseLong(parts[1]); + offsetRanges.add(new OffsetRange(offset, length)); + } catch (NumberFormatException ex) { + throw exception; + } + } + } + + @Override + public String getIdentity() { + return getTypeName() + " " + getName() + " (" + offsetRanges + ")"; + } + + @Override + protected long vertexCount() { + return vertexCount.getValue(); + } + + @Override + public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) { + org.apache.flink.graph.generator.CirculantGraph graph = new org.apache.flink.graph.generator.CirculantGraph(env, + vertexCount.getValue()); + + for (OffsetRange offsetRange : offsetRanges) { + graph.addRange(offsetRange.getOffset(), offsetRange.getLength()); + } + + return graph + .setParallelism(littleParallelism.getValue().intValue()) + .generate(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java new file mode 100644 index 0000000..c9b0874 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers.input; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.drivers.parameter.LongParameter; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.graph.generator.EchoGraph.MINIMUM_VERTEX_COUNT; +import static org.apache.flink.graph.generator.EchoGraph.MINIMUM_VERTEX_DEGREE; + +/** + * Generate an {@link org.apache.flink.graph.generator.EchoGraph}. + */ +public class EchoGraph +extends GeneratedGraph<LongValue> { + + private LongParameter vertexCount = new LongParameter(this, "vertex_count") + .setMinimumValue(MINIMUM_VERTEX_COUNT); + + private LongParameter vertexDegree = new LongParameter(this, "vertex_degree") + .setMinimumValue(MINIMUM_VERTEX_DEGREE); + + private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") + .setDefaultValue(PARALLELISM_DEFAULT); + + @Override + public String getName() { + return EchoGraph.class.getSimpleName(); + } + + @Override + public String getIdentity() { + return getTypeName() + " " + getName() + " (" + vertexCount.getValue() + ":" + vertexDegree.getValue() + ")"; + } + + @Override + protected long vertexCount() { + return vertexCount.getValue(); + } + + @Override + protected Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) throws Exception { + return new org.apache.flink.graph.generator.EchoGraph(env, vertexCount.getValue(), vertexDegree.getValue()) + .setParallelism(littleParallelism.getValue().intValue()) + .generate(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java index b41b86e..2ce3c77 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java @@ -54,7 +54,8 @@ extends GeneratedGraph<LongValue> { @Override public String getUsage() { - return "--dim0 size:wrap_endpoints [--dim1 size:wrap_endpoints [--dim2 ...]]" + super.getUsage(); + return "--" + PREFIX + "0 size:wrap_endpoints [--" + PREFIX + " size:wrap_endpoints [--" + PREFIX + " ...]]" + + super.getUsage(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java index f566218..d3ba4fb 100644 --- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java @@ -44,6 +44,50 @@ extends DriverBaseITCase { } @Test + public void testHashWithCirculantGraph() throws Exception { + long checksum; + switch (idType) { + case "byte": + case "nativeByte": + case "short": + case "nativeShort": + case "char": + case "nativeChar": + case "integer": + case "nativeInteger": + case "nativeLong": + checksum = 0x0000000000344448L; + break; + + case "long": + checksum = 0x0000000000a19d48L; + break; + + case "string": + case "nativeString": + checksum = 0x000000000c47ca48L; + break; + + default: + throw new IllegalArgumentException("Unknown type: " + idType); + } + + expectedChecksum( + parameters("CirculantGraph", "hash", "--vertex_count", "42", "--range0", "13:4"), + 168, checksum); + } + + @Test + public void testPrintWithCirculantGraph() throws Exception { + // skip 'char' since it is not printed as a number + Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar")); + + expectedOutputChecksum( + parameters("CirculantGraph", "print", "--vertex_count", "42", "--range0", "13:4"), + new Checksum(168, 0x0000004bdcc52cbcL)); + } + + @Test public void testLongDescription() throws Exception { String expected = regexSubstring(new EdgeList().getLongDescription()); @@ -142,10 +186,54 @@ extends DriverBaseITCase { } @Test + public void testHashWithEchoGraph() throws Exception { + long checksum; + switch (idType) { + case "byte": + case "nativeByte": + case "short": + case "nativeShort": + case "char": + case "nativeChar": + case "integer": + case "nativeInteger": + case "nativeLong": + checksum = 0x0000000000a9ddeaL; + break; + + case "long": + checksum = 0x00000000020d3f2aL; + break; + + case "string": + case "nativeString": + checksum = 0x0000000027e9516aL; + break; + + default: + throw new IllegalArgumentException("Unknown type: " + idType); + } + + expectedChecksum( + parameters("EchoGraph", "hash", "--vertex_count", "42", "--vertex_degree", "13"), + 546, checksum); + } + + @Test + public void testPrintWithEchoGraph() throws Exception { + // skip 'char' since it is not printed as a number + Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar")); + + expectedOutputChecksum( + parameters("EchoGraph", "print", "--vertex_count", "42", "--vertex_degree", "13"), + new Checksum(546, 0x000000f7190b8fcaL)); + } + + @Test public void testHashWithEmptyGraph() throws Exception { expectedChecksum( parameters("EmptyGraph", "hash", "--vertex_count", "42"), - 0, 0x0000000000000000); + 0, 0x0000000000000000L); } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java new file mode 100644 index 0000000..9569b74 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.LongValueSequenceIterator; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +/** + * @see <a href="http://mathworld.wolfram.com/CirculantGraph.html">Circulant Graph at Wolfram MathWorld</a> + */ +public class CirculantGraph +extends AbstractGraphGenerator<LongValue, NullValue, NullValue> { + + public static final int MINIMUM_VERTEX_COUNT = 2; + + public static final int MINIMUM_OFFSET = 1; + + // Required to create the DataSource + private final ExecutionEnvironment env; + + // Required configuration + private long vertexCount; + + private List<OffsetRange> offsetRanges = new ArrayList<>(); + + /** + * An oriented {@link Graph} with {@code n} vertices where each vertex + * v<sub>i</sub> is connected to vertex v<sub>(i+j)%n</sub> for each + * configured offset {@code j}. + * + * @param env the Flink execution environment + * @param vertexCount number of vertices + */ + public CirculantGraph(ExecutionEnvironment env, long vertexCount) { + Preconditions.checkArgument(vertexCount >= MINIMUM_VERTEX_COUNT, + "Vertex count must be at least " + MINIMUM_VERTEX_COUNT); + + this.env = env; + this.vertexCount = vertexCount; + } + + /** + * Required configuration for each range of offsets in the graph. + * + * @param offset first offset appointing the vertices' position + * @param length number of contiguous offsets in range + * @return this + */ + public CirculantGraph addRange(long offset, long length) { + Preconditions.checkArgument(offset >= MINIMUM_OFFSET, + "Range offset must be at least " + MINIMUM_OFFSET); + Preconditions.checkArgument(length <= vertexCount - offset, + "Range length must not be greater than the vertex count minus the range offset."); + + offsetRanges.add(new OffsetRange(offset, length)); + + return this; + } + + @Override + public Graph<LongValue, NullValue, NullValue> generate() { + // Vertices + DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount); + + // Edges + LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, this.vertexCount - 1); + + // Validate ranges + Collections.sort(offsetRanges); + Iterator<OffsetRange> iter = offsetRanges.iterator(); + OffsetRange lastRange = iter.next(); + + while (iter.hasNext()) { + OffsetRange nextRange = iter.next(); + + if (lastRange.overlaps(nextRange)) { + throw new IllegalArgumentException("Overlapping ranges " + lastRange + " and " + nextRange); + } + + lastRange = nextRange; + } + + DataSet<Edge<LongValue, NullValue>> edges = env + .fromParallelCollection(iterator, LongValue.class) + .setParallelism(parallelism) + .name("Edge iterators") + .flatMap(new LinkVertexToOffsets(vertexCount, offsetRanges)) + .setParallelism(parallelism) + .name("Circulant graph edges"); + + // Graph + return Graph.fromDataSet(vertices, edges, env); + } + + @FunctionAnnotation.ForwardedFields("*->f0") + private static class LinkVertexToOffsets + implements FlatMapFunction<LongValue, Edge<LongValue, NullValue>> { + private final long vertexCount; + + private final List<OffsetRange> offsetRanges; + + private LongValue target = new LongValue(); + + private Edge<LongValue, NullValue> edge = new Edge<>(null, target, NullValue.getInstance()); + + public LinkVertexToOffsets(long vertexCount, List<OffsetRange> offsetRanges) { + this.vertexCount = vertexCount; + this.offsetRanges = offsetRanges; + } + + @Override + public void flatMap(LongValue source, Collector<Edge<LongValue, NullValue>> out) + throws Exception { + edge.f0 = source; + long sourceID = source.getValue(); + + for (OffsetRange offsetRange : offsetRanges) { + long targetID = sourceID + offsetRange.getOffset(); + + for (long i = offsetRange.getLength(); i > 0; i--) { + // add positive offset + target.setValue(targetID++ % vertexCount); + out.collect(edge); + } + } + } + } + + /** + * Stores the start offset and length configuration for an offset range. + */ + public static class OffsetRange implements Serializable, Comparable<OffsetRange> { + private long offset; + + private long length; + + /** + * Construct a range with the given offset and length. + * + * @param offset the range offset + * @param length the range length + */ + public OffsetRange(long offset, long length) { + this.offset = offset; + this.length = length; + } + + /** + * Get the range offset + * + * @return the offset + */ + public long getOffset() { + return offset; + } + + /** + * Get the range length + * + * @return the length + */ + public long getLength() { + return length; + } + + /** + * Get the offset of the last index in the range + * + * @return last offset + */ + public long getLastOffset() { + return offset + length - 1; + } + + /** + * Return true if and only if the other range and this range share a + * common offset ID. + * + * @param other other range + * @return whether ranges are overlapping + */ + public boolean overlaps(OffsetRange other) { + boolean overlapping = false; + + long lastOffset = getLastOffset(); + long otherLastOffset = other.getLastOffset(); + + // check whether this range contains other + overlapping |= (offset <= other.offset && other.offset <= lastOffset); + overlapping |= (offset <= otherLastOffset && otherLastOffset <= lastOffset); + + // check whether other contains this range + overlapping |= (other.offset <= offset && offset <= otherLastOffset); + overlapping |= (other.offset <= lastOffset && lastOffset <= otherLastOffset); + + return overlapping; + } + + @Override + public String toString() { + return Long.toString(offset) + ":" + Long.toString(length); + } + + @Override + public int compareTo(OffsetRange o) { + int cmp = Long.compare(offset, o.offset); + if (cmp != 0) { + return cmp; + } + + return Long.compare(length, o.length); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java index 11c0bb0..9dabe56 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java @@ -18,17 +18,10 @@ package org.apache.flink.graph.generator; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; -import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; import org.apache.flink.types.LongValue; import org.apache.flink.types.NullValue; -import org.apache.flink.util.Collector; -import org.apache.flink.util.LongValueSequenceIterator; import org.apache.flink.util.Preconditions; /** @@ -61,53 +54,9 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> { @Override public Graph<LongValue, NullValue, NullValue> generate() { - // Vertices - DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount); - - // Edges - LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, this.vertexCount - 1); - - DataSet<Edge<LongValue, NullValue>> edges = env - .fromParallelCollection(iterator, LongValue.class) + return new CirculantGraph(env, vertexCount) + .addRange(1, vertexCount - 1) .setParallelism(parallelism) - .name("Edge iterators") - .flatMap(new LinkVertexToAll(vertexCount)) - .setParallelism(parallelism) - .name("Complete graph edges"); - - // Graph - return Graph.fromDataSet(vertices, edges, env); - } - - @ForwardedFields("*->f0") - private static class LinkVertexToAll - implements FlatMapFunction<LongValue, Edge<LongValue, NullValue>> { - private final long vertexCount; - - private LongValue target = new LongValue(); - - private Edge<LongValue, NullValue> edge = new Edge<>(null, target, NullValue.getInstance()); - - public LinkVertexToAll(long vertexCount) { - this.vertexCount = vertexCount; - } - - @Override - public void flatMap(LongValue source, Collector<Edge<LongValue, NullValue>> out) - throws Exception { - edge.f0 = source; - - long s = source.getValue(); - long t = (s + 1) % vertexCount; - - while (s != t) { - target.setValue(t); - out.collect(edge); - - if (++t == vertexCount) { - t = 0; - } - } - } + .generate(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java new file mode 100644 index 0000000..c15cdca --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Preconditions; + +/** + * A {@link CirculantGraph} with {@code n} vertices defined by the width of a + * single range of offsets centered at {@code n/2}. A vertex is connected to + * 'far' vertices, which connect to 'near' vertices, which connect to 'far' + * vertices, .... + * <p> + * Every {@link Vertex} in the {@link EchoGraph} has the same degree. + * and vertices as far as possible are chose to be linked. + * {@link EchoGraph} is a specific case of {@link CirculantGraph}. + */ +public class EchoGraph +extends AbstractGraphGenerator<LongValue, NullValue, NullValue> { + + public static final int MINIMUM_VERTEX_COUNT = 2; + + public static final int MINIMUM_VERTEX_DEGREE = 1; + + // Required to create the DataSource + private final ExecutionEnvironment env; + + // Required configuration + private long vertexCount; + + private long vertexDegree; + + /** + * An undirected {@link Graph} whose vertices have the same degree. + * + * @param env the Flink execution environment + * @param vertexCount number of vertices + * @param vertexDegree degree of vertices + */ + public EchoGraph(ExecutionEnvironment env, long vertexCount, long vertexDegree) { + Preconditions.checkArgument(vertexCount >= MINIMUM_VERTEX_COUNT, + "Vertex count must be at least " + MINIMUM_VERTEX_COUNT); + Preconditions.checkArgument(vertexDegree >= MINIMUM_VERTEX_DEGREE, + "Vertex degree must be at least " + MINIMUM_VERTEX_DEGREE); + Preconditions.checkArgument(vertexDegree < vertexCount, + "Vertex degree must be less than the vertex count."); + Preconditions.checkArgument(vertexCount % 2 == 0 ^ vertexDegree % 2 == 0, + "Vertex count or vertex degree must be an even number but not both."); + + this.env = env; + this.vertexCount = vertexCount; + this.vertexDegree = vertexDegree; + } + + @Override + public Graph<LongValue, NullValue, NullValue> generate() { + return new CirculantGraph(env, vertexCount) + .addRange((vertexCount - vertexDegree + 1) / 2, vertexDegree) + .setParallelism(parallelism) + .generate(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java index 23a6f98..0570dd2 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java @@ -85,9 +85,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> { @Override public Graph<LongValue, NullValue, NullValue> generate() { - if (dimensions.isEmpty()) { - throw new RuntimeException("No dimensions added to GridGraph"); - } + Preconditions.checkState(!dimensions.isEmpty(), "No dimensions added to GridGraph"); // Vertices DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount); http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java new file mode 100644 index 0000000..aae88ca --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import static org.junit.Assert.assertEquals; + +public class CirculantGraphTest +extends AbstractGraphTest { + + @Test + public void testGraph() + throws Exception { + Graph<LongValue, NullValue, NullValue> graph = new CirculantGraph(env, 10) + .addRange(4, 3) + .generate(); + + String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9"; + String edges = "0,4; 0,5; 0,6; 1,5; 1,6; 1,7; 2,6;" + + "2,7; 2,8; 3,7; 3,8; 3,9; 4,0; 4,8; 4,9;" + + "5,0; 5,1; 5,9; 6,0; 6,1; 6,2; 7,1; 7,2; 7,3;" + + "8,2; 8,3; 8,4; 9,3; 9,4; 9,5"; + + TestUtils.compareGraph(graph, vertices, edges); + } + + @Test + public void testGraphMetrics() + throws Exception { + int vertexCount = 10; + int offset = 4; + int length = 2; + + Graph<LongValue, NullValue, NullValue> graph = new CirculantGraph(env, 10) + .addRange(offset, length) + .generate(); + + assertEquals(vertexCount, graph.numberOfVertices()); + assertEquals(vertexCount * length, graph.numberOfEdges()); + + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue(); + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue(); + + assertEquals(length, maxInDegree); + assertEquals(length, maxOutDegree); + } + + @Test + public void testParallelism() + throws Exception { + int parallelism = 2; + + Graph<LongValue, NullValue, NullValue> graph = new CirculantGraph(env, 10) + .addRange(4, 2) + .setParallelism(parallelism) + .generate(); + + graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue, NullValue>>()); + graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue, NullValue>>()); + + TestUtils.verifyParallelism(env, parallelism); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EchoGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EchoGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EchoGraphTest.java new file mode 100644 index 0000000..777b576 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EchoGraphTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import static org.junit.Assert.assertEquals; + +public class EchoGraphTest +extends AbstractGraphTest { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testGraphWithEvenVertexCountWithOddVertexDegree() + throws Exception { + Graph<LongValue, NullValue, NullValue> graph = new EchoGraph(env, 10, 3) + .generate(); + + String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9"; + String edges = "0,4; 0,5; 0,6; 1,5; 1,6; 1,7; 2,6;" + + "2,7; 2,8; 3,7; 3,8; 3,9; 4,0; 4,8; 4,9;" + + "5,0; 5,1; 5,9; 6,0; 6,1; 6,2; 7,1; 7,2; 7,3;" + + "8,2; 8,3; 8,4; 9,3; 9,4; 9,5"; + + TestUtils.compareGraph(graph, vertices, edges); + } + + @Test + public void testGraphWithOddVertexCountWithEvenVertexDegree() + throws Exception { + Graph<LongValue, NullValue, NullValue> graph = new EchoGraph(env, 9, 2) + .generate(); + + String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8"; + String edges = "0,4; 0,5; 1,5; 1,6; 2,6; 2,7;" + + "3,7; 3,8; 4,8; 4,0; 5,0; 5,1;" + + "6,1; 6,2; 7,2; 7,3; 8,3; 8,4"; + + TestUtils.compareGraph(graph, vertices, edges); + } + + @Test + public void testGraphWithOddVertexCountWithOddVertexDegree() + throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Vertex count or vertex degree must be an even number but not both."); + + new EchoGraph(env, 5, 3).generate(); + } + + @Test + public void testGraphWithEvenVertexCountWithEvenVertexDegree() + throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Vertex count or vertex degree must be an even number but not both."); + + new EchoGraph(env, 6, 2).generate(); + } + + @Test + public void testGraphWithVertexDegreeTooLarge() + throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Vertex degree must be less than the vertex count."); + + new EchoGraph(env, 8, 8).generate(); + } + + @Test + public void testGraphMetrics() + throws Exception { + int vertexCount = 10; + int vertexDegree = 3; + + Graph<LongValue, NullValue, NullValue> graph = new EchoGraph(env, vertexCount, vertexDegree) + .generate(); + + assertEquals(vertexCount, graph.numberOfVertices()); + assertEquals(vertexCount * vertexDegree, graph.numberOfEdges()); + + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue(); + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue(); + + assertEquals(vertexDegree, maxInDegree); + assertEquals(vertexDegree, maxOutDegree); + } + + @Test + public void testParallelism() + throws Exception { + int parallelism = 2; + + Graph<LongValue, NullValue, NullValue> graph = new EchoGraph(env, 10, 3) + .setParallelism(parallelism) + .generate(); + + graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue, NullValue>>()); + graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue, NullValue>>()); + + TestUtils.verifyParallelism(env, parallelism); + } +}
