Repository: flink Updated Branches: refs/heads/master 31e120a98 -> f1ff99fdc
[FLINK-4949] [gelly] Refactor Gelly driver inputs The Gelly drivers started as simple wrappers around library algorithms but have grown to handle a matrix of input sources while often running multiple algorithms and analytics with custom parameterization. The monolithic drivers are replaced with separate inputs and algorithms. Command-line parameter parsers are shared and reusable across inputs and algorithms. Algorithm results now implement a common AlgorithmResult interface. Drivers are now tested with integration tests. This closes #3294 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1ff99fd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1ff99fd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1ff99fd Branch: refs/heads/master Commit: f1ff99fdc1e228acd936f5684832d5cf49bdbe04 Parents: 31e120a Author: Greg Hogan <[email protected]> Authored: Wed Oct 26 15:18:50 2016 -0400 Committer: Greg Hogan <[email protected]> Committed: Fri Mar 31 15:57:54 2017 -0400 ---------------------------------------------------------------------- docs/dev/libs/gelly/index.md | 40 ++- flink-libraries/flink-gelly-examples/pom.xml | 9 +- .../java/org/apache/flink/graph/Runner.java | 357 +++++++++++++++++++ .../main/java/org/apache/flink/graph/Usage.java | 71 ---- .../org/apache/flink/graph/RunnerITCase.java | 122 +++++++ .../flink/graph/drivers/AdamicAdarITCase.java | 52 +++ .../drivers/ClusteringCoefficientITCase.java | 89 +++++ .../drivers/ConnectedComponentsITCase.java | 65 ++++ .../flink/graph/drivers/DriverBaseITCase.java | 185 ++++++++++ .../flink/graph/drivers/EdgeListITCase.java | 240 +++++++++++++ .../flink/graph/drivers/GraphMetricsITCase.java | 100 ++++++ .../apache/flink/graph/drivers/HITSITCase.java | 52 +++ .../flink/graph/drivers/JaccardIndexITCase.java | 63 ++++ .../flink/graph/drivers/PageRankITCase.java | 52 +++ .../graph/drivers/TriangleListingITCase.java | 107 ++++++ 15 files changed, 1518 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/docs/dev/libs/gelly/index.md ---------------------------------------------------------------------- diff --git a/docs/dev/libs/gelly/index.md b/docs/dev/libs/gelly/index.md index 8f8c6de..40018e8 100644 --- a/docs/dev/libs/gelly/index.md +++ b/docs/dev/libs/gelly/index.md @@ -73,7 +73,7 @@ Running Gelly Examples The Gelly library and examples jars are provided in the [Flink distribution](https://flink.apache.org/downloads.html "Apache Flink: Downloads") in the folder **opt** (for versions older than Flink 1.2 these can be manually downloaded from -[Maven Central](http://search.maven.org/#search|ga|1|flink%20gelly). +[Maven Central](http://search.maven.org/#search|ga|1|flink%20gelly)). To run the Gelly examples the **flink-gelly** (for Java) or **flink-gelly-scala** (for Scala) jar must be copied to Flink's **lib** directory. @@ -83,21 +83,29 @@ cp opt/flink-gelly_*.jar lib/ cp opt/flink-gelly-scala_*.jar lib/ ~~~ -Gelly's examples jar includes both drivers for the library methods as well as additional example algorithms. After -configuring and starting the cluster, list the available algorithm classes: +Gelly's examples jar includes drivers for each of the library methods. After configuring and starting the cluster, list +the available algorithm classes: ~~~bash ./bin/start-cluster.sh ./bin/flink run opt/flink-gelly-examples_*.jar ~~~ -The Gelly drivers can generate [RMat](http://www.cs.cmu.edu/~christos/PUBLICATIONS/siam04.pdf) graph data or read the -edge list from a CSV file. Each node in a cluster must have access to the input file. Calculate graph metrics on a -directed generated graph: +The Gelly drivers can generate graph data or read the edge list from a CSV file (each node in a cluster must have access +to the input file). The algorithm description, available inputs and outputs, and configuration are displayed when an +algorithm is selected. Print usage for [JaccardIndex](./library_methods.html#jaccard-index): ~~~bash -./bin/flink run -c org.apache.flink.graph.drivers.GraphMetrics opt/flink-gelly-examples_*.jar \ - --directed true --input rmat +./bin/flink run opt/flink-gelly-examples_*.jar --algorithm JaccardIndex +~~~ + +Display [graph metrics](./library_methods.html#metric) for a million vertex graph: + +~~~bash +./bin/flink run opt/flink-gelly-examples_*.jar \ + --algorithm GraphMetrics --order directed \ + --input RMatGraph --type integer --scale 20 --simplify directed \ + --output print ~~~ The size of the graph is adjusted by the *\-\-scale* and *\-\-edge_factor* parameters. The @@ -111,15 +119,19 @@ Run a few algorithms and monitor the job progress in Flink's Web UI: ~~~bash wget -O - http://snap.stanford.edu/data/bigdata/communities/com-lj.ungraph.txt.gz | gunzip -c > com-lj.ungraph.txt -./bin/flink run -q -c org.apache.flink.graph.drivers.GraphMetrics opt/flink-gelly-examples_*.jar \ - --directed true --input csv --type integer --input_filename com-lj.ungraph.txt --input_field_delimiter '\t' +./bin/flink run -q opt/flink-gelly-examples_*.jar \ + --algorithm GraphMetrics --order undirected \ + --input CSV --type integer --simplify undirected --input_filename com-lj.ungraph.txt --input_field_delimiter $'\t' \ + --output print -./bin/flink run -q -c org.apache.flink.graph.drivers.ClusteringCoefficient opt/flink-gelly-examples_*.jar \ - --directed true --input csv --type integer --input_filename com-lj.ungraph.txt --input_field_delimiter '\t' \ +./bin/flink run -q opt/flink-gelly-examples_*.jar \ + --algorithm ClusteringCoefficient --order undirected \ + --input CSV --type integer --simplify undirected --input_filename com-lj.ungraph.txt --input_field_delimiter $'\t' \ --output hash -./bin/flink run -q -c org.apache.flink.graph.drivers.JaccardIndex opt/flink-gelly-examples_*.jar \ - --input csv --type integer --simplify true --input_filename com-lj.ungraph.txt --input_field_delimiter '\t' \ +./bin/flink run -q opt/flink-gelly-examples_*.jar \ + --algorithm JaccardIndex \ + --input CSV --type integer --simplify undirected --input_filename com-lj.ungraph.txt --input_field_delimiter $'\t' \ --output hash ~~~ http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/pom.xml b/flink-libraries/flink-gelly-examples/pom.xml index b533119..e95aa37 100644 --- a/flink-libraries/flink-gelly-examples/pom.xml +++ b/flink-libraries/flink-gelly-examples/pom.xml @@ -83,6 +83,13 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-tests_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -163,7 +170,7 @@ <configuration> <archive> <manifestEntries> - <Main-Class>org.apache.flink.graph.Usage</Main-Class> + <Main-Class>org.apache.flink.graph.Runner</Main-Class> </manifestEntries> </archive> </configuration> http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java new file mode 100644 index 0000000..0324814 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.text.StrBuilder; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvOutputFormat; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.client.program.ProgramParametrizationException; +import org.apache.flink.graph.drivers.AdamicAdar; +import org.apache.flink.graph.drivers.ClusteringCoefficient; +import org.apache.flink.graph.drivers.ConnectedComponents; +import org.apache.flink.graph.drivers.Driver; +import org.apache.flink.graph.drivers.EdgeList; +import org.apache.flink.graph.drivers.GraphMetrics; +import org.apache.flink.graph.drivers.HITS; +import org.apache.flink.graph.drivers.JaccardIndex; +import org.apache.flink.graph.drivers.PageRank; +import org.apache.flink.graph.drivers.TriangleListing; +import org.apache.flink.graph.drivers.input.CompleteGraph; +import org.apache.flink.graph.drivers.input.CycleGraph; +import org.apache.flink.graph.drivers.input.EmptyGraph; +import org.apache.flink.graph.drivers.input.GridGraph; +import org.apache.flink.graph.drivers.input.HypercubeGraph; +import org.apache.flink.graph.drivers.input.Input; +import org.apache.flink.graph.drivers.input.PathGraph; +import org.apache.flink.graph.drivers.input.RMatGraph; +import org.apache.flink.graph.drivers.input.SingletonEdgeGraph; +import org.apache.flink.graph.drivers.input.StarGraph; +import org.apache.flink.graph.drivers.output.Hash; +import org.apache.flink.graph.drivers.output.Print; +import org.apache.flink.graph.drivers.parameter.Parameterized; +import org.apache.flink.util.InstantiationUtil; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * This default main class executes Flink drivers. + * + * An execution has one input, one algorithm, and one output. Anything more + * complex can be expressed as a user program written in a JVM language. + * + * Inputs and algorithms are decoupled by, respectively, producing and + * consuming a graph. Currently only {@code Graph} is supported but later + * updates may add support for new graph types such as {@code BipartiteGraph}. + * + * Algorithms must explicitly support each type of output via implementation of + * interfaces. This is scalable as the number of outputs is small and finite. + */ +public class Runner { + + private static final String INPUT = "input"; + + private static final String ALGORITHM = "algorithm"; + + private static final String OUTPUT = "output"; + + private static ParameterizedFactory<Input> inputFactory = new ParameterizedFactory<Input>() + .addClass(CompleteGraph.class) + .addClass(org.apache.flink.graph.drivers.input.CSV.class) + .addClass(CycleGraph.class) + .addClass(EmptyGraph.class) + .addClass(GridGraph.class) + .addClass(HypercubeGraph.class) + .addClass(PathGraph.class) + .addClass(RMatGraph.class) + .addClass(SingletonEdgeGraph.class) + .addClass(StarGraph.class); + + private static ParameterizedFactory<Driver> driverFactory = new ParameterizedFactory<Driver>() + .addClass(AdamicAdar.class) + .addClass(ClusteringCoefficient.class) + .addClass(ConnectedComponents.class) + .addClass(EdgeList.class) + .addClass(GraphMetrics.class) + .addClass(HITS.class) + .addClass(JaccardIndex.class) + .addClass(PageRank.class) + .addClass(TriangleListing.class); + + /** + * List available algorithms. This is displayed to the user when no valid + * algorithm is given in the program parameterization. + * + * @return usage string listing available algorithms + */ + private static String getAlgorithmsListing() { + StrBuilder strBuilder = new StrBuilder(); + + strBuilder + .appendNewLine() + .appendln("Select an algorithm to view usage: flink run opt/flink-gelly-examples_<version>.jar --algorithm <algorithm>") + .appendNewLine() + .appendln("Available algorithms:"); + + for (Driver algorithm : driverFactory) { + strBuilder.append(" ") + .appendFixedWidthPadRight(algorithm.getName(), 30, ' ') + .append(algorithm.getShortDescription()).appendNewLine(); + } + + return strBuilder.toString(); + } + + /** + * Display the usage for the given algorithm. This includes options for all + * compatible inputs, the selected algorithm, and outputs implemented by + * the selected algorithm. + * + * @param algorithmName unique identifier of the selected algorithm + * @return usage string for the given algorithm + */ + private static String getAlgorithmUsage(String algorithmName) { + StrBuilder strBuilder = new StrBuilder(); + + Driver algorithm = driverFactory.get(algorithmName); + + strBuilder + .appendNewLine() + .appendNewLine() + .appendln(algorithm.getLongDescription()) + .appendNewLine() + .append("usage: flink run opt/flink-gelly-examples_<version>.jar --algorithm ") + .append(algorithmName) + .append(" [algorithm options] --input <input> [input options] --output <output> [output options]") + .appendNewLine() + .appendNewLine() + .appendln("Available inputs:"); + + for (Input input : inputFactory) { + strBuilder + .append(" --input ") + .append(input.getName()) + .append(" ") + .appendln(input.getUsage()); + } + + String algorithmParameterization = algorithm.getUsage(); + + if (algorithmParameterization.length() > 0) { + strBuilder + .appendNewLine() + .appendln("Algorithm configuration:") + .append(" ") + .appendln(algorithm.getUsage()); + } + + strBuilder + .appendNewLine() + .appendln("Available outputs:"); + + if (algorithm instanceof org.apache.flink.graph.drivers.output.CSV) { + strBuilder.appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]"); + } + + if (algorithm instanceof Hash) { + strBuilder.appendln(" --output hash"); + } + + if (algorithm instanceof Print) { + strBuilder.appendln(" --output print"); + } + + return strBuilder + .appendNewLine() + .toString(); + } + + public static void main(String[] args) throws Exception { + // Set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // should not have any non-Flink data types + env.getConfig().disableAutoTypeRegistration(); + env.getConfig().disableForceAvro(); + env.getConfig().disableForceKryo(); + + ParameterTool parameters = ParameterTool.fromArgs(args); + env.getConfig().setGlobalJobParameters(parameters); + + // integration tests run with with object reuse both disabled and enabled + if (parameters.has("__disable_object_reuse")) { + env.getConfig().disableObjectReuse(); + } else { + env.getConfig().enableObjectReuse(); + } + + // Usage + + if (!parameters.has(ALGORITHM)) { + throw new ProgramParametrizationException(getAlgorithmsListing()); + } + + String algorithmName = parameters.get(ALGORITHM); + Driver algorithm = driverFactory.get(algorithmName); + + if (algorithm == null) { + throw new ProgramParametrizationException("Unknown algorithm name: " + algorithmName); + } + + if (!parameters.has(INPUT)) { + if (!parameters.has(OUTPUT)) { + // if neither input nor output is given then print algorithm usage + throw new ProgramParametrizationException(getAlgorithmUsage(algorithmName)); + } + throw new ProgramParametrizationException("No input given"); + } + + String inputName = parameters.get(INPUT); + Input input = inputFactory.get(inputName); + + if (input == null) { + throw new ProgramParametrizationException("Unknown input type: " + inputName); + } + + // Input + + input.configure(parameters); + Graph graph = input.create(env); + + // Algorithm + + algorithm.configure(parameters); + algorithm.plan(graph); + + // Output + if (!parameters.has(OUTPUT)) { + throw new ProgramParametrizationException("No output given"); + } + + String outputName = parameters.get(OUTPUT); + String executionNamePrefix = input.getIdentity() + " -> " + algorithmName + " -> "; + + System.out.println(); + + switch (outputName.toLowerCase()) { + case "csv": + if (algorithm instanceof org.apache.flink.graph.drivers.output.CSV) { + String filename = parameters.getRequired("output_filename"); + + String lineDelimiter = StringEscapeUtils.unescapeJava( + parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); + + String fieldDelimiter = StringEscapeUtils.unescapeJava( + parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); + + org.apache.flink.graph.drivers.output.CSV c = (org.apache.flink.graph.drivers.output.CSV) algorithm; + c.writeCSV(filename, lineDelimiter, fieldDelimiter); + + env.execute(executionNamePrefix + "CSV"); + } else { + throw new ProgramParametrizationException("Algorithm does not support output type 'CSV'"); + } + break; + + case "hash": + if (algorithm instanceof Hash) { + Hash h = (Hash) algorithm; + h.hash(executionNamePrefix + "Hash"); + } else { + throw new ProgramParametrizationException("Algorithm does not support output type 'hash'"); + } + break; + + case "print": + if (algorithm instanceof Print) { + Print h = (Print) algorithm; + h.print(executionNamePrefix + "Print"); + } else { + throw new ProgramParametrizationException("Algorithm does not support output type 'print'"); + } + break; + + default: + throw new ProgramParametrizationException("Unknown output type: " + outputName); + } + } + + /** + * Stores a list of classes for which an instance can be requested by name + * and implements an iterator over class instances. + * + * @param <T> base type for stored classes + */ + private static class ParameterizedFactory<T extends Parameterized> + implements Iterable<T> { + private List<Class<? extends T>> classes = new ArrayList<>(); + + /** + * Add a class to the factory. + * + * @param cls subclass of T + * @return this + */ + public ParameterizedFactory<T> addClass(Class<? extends T> cls) { + this.classes.add(cls); + return this; + } + + /** + * Obtain a class instance by name. + * + * @param name String matching getName() + * @return class instance or null if no matching class + */ + public T get(String name) { + for (T instance : this) { + if (name.equals(instance.getName())) { + return instance; + } + } + + return null; + } + + @Override + public Iterator<T> iterator() { + return new Iterator<T>() { + private int index; + + @Override + public boolean hasNext() { + return index < classes.size(); + } + + @Override + public T next() { + return InstantiationUtil.instantiate(classes.get(index++)); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java deleted file mode 100644 index 642fe5b..0000000 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph; - -import org.apache.commons.lang3.text.StrBuilder; -import org.apache.flink.client.program.ProgramParametrizationException; - -/** - * This default main class prints usage listing available classes. - */ -public class Usage { - - private static final Class[] DRIVERS = new Class[]{ - org.apache.flink.graph.drivers.ClusteringCoefficient.class, - org.apache.flink.graph.drivers.GraphMetrics.class, - org.apache.flink.graph.drivers.HITS.class, - org.apache.flink.graph.drivers.JaccardIndex.class, - org.apache.flink.graph.drivers.TriangleListing.class, - }; - - private static final Class[] EXAMPLES = new Class[]{ - org.apache.flink.graph.examples.EuclideanGraphWeighing.class, - org.apache.flink.graph.examples.GSASingleSourceShortestPaths.class, - org.apache.flink.graph.examples.IncrementalSSSP.class, - org.apache.flink.graph.examples.MusicProfiles.class, - org.apache.flink.graph.examples.PregelSSSP.class, - org.apache.flink.graph.examples.SingleSourceShortestPaths.class, - org.apache.flink.graph.scala.examples.ConnectedComponents.class, - org.apache.flink.graph.scala.examples.GSASingleSourceShortestPaths.class, - org.apache.flink.graph.scala.examples.SingleSourceShortestPaths.class, - }; - - private static String getUsage() { - StrBuilder strBuilder = new StrBuilder(); - - strBuilder.appendNewLine(); - strBuilder.appendln("Driver classes call algorithms from the Gelly library:"); - for (Class cls : DRIVERS) { - strBuilder.append(" ").appendln(cls.getName()); - } - - strBuilder.appendNewLine(); - strBuilder.appendln("Example classes illustrate Gelly APIs or alternative algorithms:"); - for (Class cls : EXAMPLES) { - strBuilder.append(" ").appendln(cls.getName()); - } - - return strBuilder.toString(); - } - - public static void main(String[] args) throws Exception { - // this exception is throw to prevent Flink from printing an error message - throw new ProgramParametrizationException(getUsage()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/RunnerITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/RunnerITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/RunnerITCase.java new file mode 100644 index 0000000..a48fdf1 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/RunnerITCase.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.client.program.ProgramParametrizationException; +import org.apache.flink.graph.drivers.DriverBaseITCase; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class RunnerITCase +extends DriverBaseITCase { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + public RunnerITCase(TestExecutionMode mode) { + super(mode); + } + + @Test + public void testWithoutAlgorithm() throws Exception { + String expected = "Select an algorithm to view usage:"; + + thrown.expect(ProgramParametrizationException.class); + thrown.expectMessage(expected); + + expectedOutput(new String[]{}, expected); + } + + @Test + public void testWithUnknownAlgorithm() throws Exception { + String expected = "Unknown algorithm name: NotAnAlgorithm"; + + thrown.expect(ProgramParametrizationException.class); + thrown.expectMessage(expected); + + expectedOutput(new String[]{"--algorithm", "NotAnAlgorithm"}, expected); + } + + @Test + public void testAlgorithmUsage() throws Exception { + String expected = "Pass-through of the graph's edge list."; + + thrown.expect(ProgramParametrizationException.class); + thrown.expectMessage(expected); + + expectedOutput(new String[]{"--algorithm", "EdgeList"}, expected); + } + + @Test + public void testWithoutInput() throws Exception { + String expected = "No input given"; + + thrown.expect(ProgramParametrizationException.class); + thrown.expectMessage(expected); + + expectedOutput( + new String[]{"--algorithm", "EdgeList", + "--output", "NotAnOutput"}, + expected); + } + + @Test + public void testWithUnknownInput() throws Exception { + String expected = "Unknown input type: NotAnInput"; + + thrown.expect(ProgramParametrizationException.class); + thrown.expectMessage(expected); + + expectedOutput( + new String[]{"--algorithm", "EdgeList", + "--input", "NotAnInput"}, + expected); + } + + @Test + public void testWithoutOutput() throws Exception { + String expected = "No output given"; + + thrown.expect(ProgramParametrizationException.class); + thrown.expectMessage(expected); + + expectedOutput( + new String[]{"--algorithm", "EdgeList", + "--input", "RMatGraph"}, + expected); + } + + @Test + public void testWithUnknownOutput() throws Exception { + String expected = "Unknown output type: NotAnOutput"; + + thrown.expect(ProgramParametrizationException.class); + thrown.expectMessage(expected); + + expectedOutput( + new String[]{"--algorithm", "EdgeList", + "--input", "RMatGraph", + "--output", "NotAnOutput"}, + expected); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/AdamicAdarITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/AdamicAdarITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/AdamicAdarITCase.java new file mode 100644 index 0000000..400c241 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/AdamicAdarITCase.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers; + +import org.apache.flink.client.program.ProgramParametrizationException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class AdamicAdarITCase +extends DriverBaseITCase { + + public AdamicAdarITCase(TestExecutionMode mode) { + super(mode); + } + + @Test + public void testLongDescription() throws Exception { + String expected = regexSubstring(new AdamicAdar().getLongDescription()); + + expectedOutputFromException( + new String[]{"--algorithm", "AdamicAdar"}, + expected, + ProgramParametrizationException.class); + } + + @Test + public void testPrintWithRMatIntegerGraph() throws Exception { + expectedCount( + new String[]{"--algorithm", "AdamicAdar", + "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected", + "--output", "print"}, + 221628); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ClusteringCoefficientITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ClusteringCoefficientITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ClusteringCoefficientITCase.java new file mode 100644 index 0000000..f215b91 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ClusteringCoefficientITCase.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers; + +import org.apache.flink.client.program.ProgramParametrizationException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class ClusteringCoefficientITCase +extends DriverBaseITCase { + + public ClusteringCoefficientITCase(TestExecutionMode mode) { + super(mode); + } + + @Test + public void testLongDescription() throws Exception { + String expected = regexSubstring(new ClusteringCoefficient().getLongDescription()); + + expectedOutputFromException( + new String[]{"--algorithm", "ClusteringCoefficient"}, + expected, + ProgramParametrizationException.class); + } + + @Test + public void testDirectedHashWithRMatIntegerGraph() throws Exception { + String expected = "\n" + + "ChecksumHashCode 0x000001c0409df6c0, count 902\n" + + "triplet count: 1003442, triangle count: 225147, global clustering coefficient: 0.22437470[0-9]+\n" + + "vertex count: 902, average clustering coefficient: 0.32943748[0-9]+\n"; + + expectedOutput( + new String[]{"--algorithm", "ClusteringCoefficient", "--order", "directed", + "--input", "RMatGraph", "--type", "integer", "--simplify", "directed", + "--output", "hash"}, + expected); + } + + @Test + public void testDirectedPrintWithRMatIntegerGraph() throws Exception { + expectedCount( + new String[]{"--algorithm", "ClusteringCoefficient", "--order", "directed", + "--input", "RMatGraph", "--type", "integer", "--simplify", "directed", + "--output", "print"}, + 904); + } + + @Test + public void testUndirectedHashWithRMatIntegerGraph() throws Exception { + String expected = "\n" + + "ChecksumHashCode 0x000001ccf8c45fdb, count 902\n" + + "triplet count: 1003442, triangle count: 225147, global clustering coefficient: 0.22437470[0-9]+\n" + + "vertex count: 902, average clustering coefficient: 0.42173070[0-9]+\n"; + + expectedOutput( + new String[]{"--algorithm", "ClusteringCoefficient", "--order", "undirected", + "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected", + "--output", "hash"}, + expected); + } + + @Test + public void testUndirectedPrintWithRMatIntegerGraph() throws Exception { + expectedCount( + new String[]{"--algorithm", "ClusteringCoefficient", "--order", "undirected", + "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected", + "--output", "print"}, + 904); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ConnectedComponentsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ConnectedComponentsITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ConnectedComponentsITCase.java new file mode 100644 index 0000000..b91abb3 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ConnectedComponentsITCase.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers; + +import org.apache.flink.client.program.ProgramParametrizationException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class ConnectedComponentsITCase +extends DriverBaseITCase { + + public ConnectedComponentsITCase(TestExecutionMode mode) { + super(mode); + } + + @Test + public void testLongDescription() throws Exception { + String expected = regexSubstring(new ConnectedComponents().getLongDescription()); + + expectedOutputFromException( + new String[]{"--algorithm", "ConnectedComponents"}, + expected, + ProgramParametrizationException.class); + } + + @Test + public void testHashWithRMatIntegerGraph() throws Exception { + String expected = "\\nChecksumHashCode 0x0000000000cdc7e7, count 838\\n"; + + expectedOutput( + new String[]{"--algorithm", "ConnectedComponents", + "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected", "--edge_factor", "1", + "--a", "0.25", "--b", "0.25", "--c", "0.25", "--noise_enabled", "--noise", "1.0", + "--output", "hash"}, + expected); + } + + @Test + public void testPrintWithRMatIntegerGraph() throws Exception { + expectedCount( + new String[]{"--algorithm", "ConnectedComponents", + "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected", "--edge_factor", "1", + "--a", "0.25", "--b", "0.25", "--c", "0.25", "--noise_enabled", "--noise", "1.0", + "--output", "print"}, + 838); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/DriverBaseITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/DriverBaseITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/DriverBaseITCase.java new file mode 100644 index 0000000..d19ca97 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/DriverBaseITCase.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.flink.graph.Runner; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.util.FlinkRuntimeException; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.rules.ExpectedException; +import org.junit.runners.Parameterized; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.Arrays; +import java.util.Collection; +import java.util.regex.Pattern; + +public abstract class DriverBaseITCase +extends MultipleProgramsTestBase { + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + protected DriverBaseITCase(TestExecutionMode mode) { + super(mode); + } + + // extend MultipleProgramsTestBase default to include object reuse mode + @Parameterized.Parameters(name = "Execution mode = {0}") + public static Collection<Object[]> executionModes() { + return Arrays.asList( + new Object[] { TestExecutionMode.CLUSTER }, + new Object[] { TestExecutionMode.CLUSTER_OBJECT_REUSE }, + new Object[] { TestExecutionMode.COLLECTION }); + } + + /** + * Simpler variant of {@link #expectedOutput(String[], String)} + * that only compares the count of the number of records in standard output. + * This is intended for use for algorithms where the result cannot be + * hashed due to approximate results (typically floating point arithmetic). + * + * @param parameters algorithm, input, and output arguments + * @param records expected number of records in standard output + * @throws Exception on error + */ + protected void expectedCount(String[] parameters, int records) throws Exception { + String output = getSystemOutput(parameters); + + // subtract the extra newline + int numberOfRecords = output.split(System.getProperty("line.separator")).length - 1; + Assert.assertEquals(records, numberOfRecords); + } + + /** + * Executes the driver with the provided arguments and compares the + * standard output with the given regular expression. + * + * @param parameters algorithm, input, and output arguments + * @param expected expected standard output + * @throws Exception on error + */ + protected void expectedOutput(String[] parameters, String expected) throws Exception { + String output = getSystemOutput(parameters); + + Assert.assertThat(output, RegexMatcher.matchesRegex(expected)); + } + + /** + * Executes the driver with the provided arguments and compares the + * exception and exception method with the given class and regular + * expression. + * + * @param parameters algorithm, input, and output arguments + * @param expected expected standard output + * @param exception expected exception + * @throws Exception on error when not matching exception + */ + protected void expectedOutputFromException(String[] parameters, String expected,Class<? extends Throwable> exception) throws Exception { + expectedException.expect(exception); + expectedException.expectMessage(RegexMatcher.matchesRegex(expected)); + + getSystemOutput(parameters); + } + + /** + * Generate a regular expression string by quoting the input string and + * adding wildcard matchers to the beginning and end. + * + * @param input source string + * @return regex string + */ + protected String regexSubstring(String input) { + // Pattern.quote disables regex interpretation of the input string and + // flag expression "(?s)" (Pattern.DOTALL) matches "." against any + // character including line terminators + return "(?s).*" + Pattern.quote(input) + ".*"; + } + + /** + * Capture the command-line standard output from the driver execution. + * + * @param args driver command-line arguments + * @return standard output from driver execution + * @throws Exception on error + */ + private String getSystemOutput(String[] args) throws Exception { + ByteArrayOutputStream output = new ByteArrayOutputStream(); + + // Configure object reuse mode + switch (mode) { + case CLUSTER: + case COLLECTION: + args = (String[])ArrayUtils.add(args, "--__disable_object_reuse"); + break; + + case CLUSTER_OBJECT_REUSE: + // object reuse is enabled by default when executing drivers + break; + + default: + throw new FlinkRuntimeException("Unknown execution mode " + mode); + } + + // Redirect stdout + PrintStream stdout = System.out; + System.setOut(new PrintStream(output)); + + Runner.main(args); + + // Restore stdout + System.setOut(stdout); + + return output.toString(); + } + + /** + * Implements a Hamcrest regex matcher. Hamcrest 2.0 provides + * Matchers.matchesPattern(String) but Flink depends on Hamcrest 1.3. + * + * see http://stackoverflow.com/a/25021229 + */ + private static class RegexMatcher + extends TypeSafeMatcher<String> { + private final String regex; + + private RegexMatcher(final String regex) { + this.regex = regex; + } + + @Override + public void describeTo(final Description description) { + description.appendText("matches regex=`" + regex + "`"); + } + + @Override + public boolean matchesSafely(final String string) { + return string.matches(regex); + } + + public static RegexMatcher matchesRegex(final String regex) { + return new RegexMatcher(regex); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java new file mode 100644 index 0000000..d9cac8b --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers; + +import org.apache.flink.client.program.ProgramParametrizationException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class EdgeListITCase +extends DriverBaseITCase { + + public EdgeListITCase(TestExecutionMode mode) { + super(mode); + } + + @Test + public void testLongDescription() throws Exception { + String expected = regexSubstring(new EdgeList().getLongDescription()); + + expectedOutputFromException( + new String[]{"--algorithm", "EdgeList"}, + expected, + ProgramParametrizationException.class); + } + + @Test + public void testHashWithCompleteGraph() throws Exception { + String expected = "\nChecksumHashCode 0x0000000006788c22, count 1722\n"; + + expectedOutput( + new String[]{"--algorithm", "EdgeList", + "--input", "CompleteGraph", "--vertex_count", "42", + "--output", "hash"}, + expected); + } + + @Test + public void testHashWithCycleGraph() throws Exception { + String expected = "\nChecksumHashCode 0x000000000050cea4, count 84\n"; + + expectedOutput( + new String[]{"--algorithm", "EdgeList", + "--input", "CycleGraph", "--vertex_count", "42", + "--output", "hash"}, + expected); + } + + @Test + public void testHashWithEmptyGraph() throws Exception { + String expected = "\nChecksumHashCode 0x0000000000000000, count 0\n"; + + expectedOutput( + new String[]{"--algorithm", "EdgeList", + "--input", "EmptyGraph", "--vertex_count", "42", + "--output", "hash"}, + expected); + } + + @Test + public void testHashWithGridGraph() throws Exception { + String expected = "\nChecksumHashCode 0x00000000357d33a6, count 2990\n"; + + expectedOutput( + new String[]{"--algorithm", "EdgeList", + "--input", "GridGraph", "--dim0", "5:true", "--dim1", "8:false", "--dim2", "13:true", + "--output", "hash"}, + expected); + } + + @Test + public void testHashWithHypercubeGraph() throws Exception { + String expected = "\nChecksumHashCode 0x0000000014a72800, count 2048\n"; + + expectedOutput( + new String[]{"--algorithm", "EdgeList", + "--input", "HypercubeGraph", "--dimensions", "8", + "--output", "hash"}, + expected); + } + + @Test + public void testHashWithPathGraph() throws Exception { + String expected = "\nChecksumHashCode 0x00000000004ee21a, count 82\n"; + + expectedOutput( + new String[]{"--algorithm", "EdgeList", + "--input", "PathGraph", "--vertex_count", "42", + "--output", "hash"}, + expected); + } + + @Test + public void testHashWithRMatIntegerGraph() throws Exception { + String expected = "\nChecksumHashCode 0x00000000ed469103, count 16384\n"; + + expectedOutput( + new String[]{"--algorithm", "EdgeList", + "--input", "RMatGraph", "--type", "integer", + "--output", "hash"}, + expected); + } + + @Test + public void testHashWithRMatIntegerDirectedGraph() throws Exception { + String expected = "\nChecksumHashCode 0x00000000c53bfc9b, count 12009\n"; + + expectedOutput( + new String[]{"--algorithm", "EdgeList", + "--input", "RMatGraph", "--type", "integer", "--simplify", "directed", + "--output", "hash"}, + expected); + } + + @Test + public void testHashWithRMatIntegerUndirectedGraph() throws Exception { + String expected = "\nChecksumHashCode 0x00000001664eb9e4, count 20884\n"; + + expectedOutput( + new String[]{"--algorithm", "EdgeList", + "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected", + "--output", "hash"}, + expected); + } + + @Test + public void testHashWithRMatLongGraph() throws Exception { + String expected = "\nChecksumHashCode 0x0000000116ee9103, count 16384\n"; + + expectedOutput( + new String[]{"--algorithm", "EdgeList", + "--input", "RMatGraph", "--type", "long", + "--output", "hash"}, + expected); + } + + @Test + public void testPrintWithRMatLongGraph() throws Exception { + + expectedCount( + new String[]{"--algorithm", "EdgeList", + "--input", "RMatGraph", "--type", "long", + "--output", "print"}, + 16384); + } + + @Test + public void testHashWithRMatLongDirectedGraph() throws Exception { + String expected = "\nChecksumHashCode 0x00000000e3c4643b, count 12009\n"; + + expectedOutput( + new String[]{"--algorithm", "EdgeList", + "--input", "RMatGraph", "--type", "long", "--simplify", "directed", + "--output", "hash"}, + expected); + } + + @Test + public void testHashWithRMatLongUndirectedGraph() throws Exception { + String expected = "\nChecksumHashCode 0x000000019b67ae64, count 20884\n"; + + expectedOutput( + new String[]{"--algorithm", "EdgeList", + "--input", "RMatGraph", "--type", "long", "--simplify", "undirected", + "--output", "hash"}, + expected); + } + + @Test + public void testHashWithRMatStringGraph() throws Exception { + String expected = "\nChecksumHashCode 0x00000071dc80a623, count 16384\n"; + + expectedOutput( + new String[]{"--algorithm", "EdgeList", + "--input", "RMatGraph", "--type", "string", + "--output", "hash"}, + expected); + } + + @Test + public void testHashWithRMatStringDirectedGraph() throws Exception { + String expected = "\nChecksumHashCode 0x0000005d58b3fa7d, count 12009\n"; + + expectedOutput( + new String[]{"--algorithm", "EdgeList", + "--input", "RMatGraph", "--type", "string", "--simplify", "directed", + "--output", "hash"}, + expected); + } + + @Test + public void testHashWithRMatStringUndirectedGraph() throws Exception { + String expected = "\nChecksumHashCode 0x000000aa54987304, count 20884\n"; + + expectedOutput( + new String[]{"--algorithm", "EdgeList", + "--input", "RMatGraph", "--type", "string", "--simplify", "undirected", + "--output", "hash"}, + expected); + } + + @Test + public void testHashWithSingletonEdgeGraph() throws Exception { + String expected = "\nChecksumHashCode 0x0000000001af8ee8, count 200\n"; + + expectedOutput( + new String[]{"--algorithm", "EdgeList", + "--input", "SingletonEdgeGraph", "--vertex_pair_count", "100", + "--output", "hash"}, + expected); + } + + @Test + public void testHashWithStarGraph() throws Exception { + String expected = "\nChecksumHashCode 0x000000000042789a, count 82\n"; + + expectedOutput( + new String[]{"--algorithm", "EdgeList", + "--input", "StarGraph", "--vertex_count", "42", + "--output", "hash"}, + expected); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java new file mode 100644 index 0000000..a5ea486 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.flink.client.program.ProgramParametrizationException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class GraphMetricsITCase +extends DriverBaseITCase { + + public GraphMetricsITCase(TestExecutionMode mode) { + super(mode); + } + + @Test + public void testLongDescription() throws Exception { + String expected = regexSubstring(new GraphMetrics().getLongDescription()); + + expectedOutputFromException( + new String[]{"--algorithm", "GraphMetrics"}, + expected, + ProgramParametrizationException.class); + } + + @Test + public void testWithDirectedRMatIntegerGraph() throws Exception { + String expected = "\n" + + "Vertex metrics:\n" + + " vertex count: 902\n" + + " edge count: 12,009\n" + + " unidirectional edge count: 8,875\n" + + " bidirectional edge count: 1,567\n" + + " average degree: 13.314\n" + + " density: 0.01477663\n" + + " triplet count: 1,003,442\n" + + " maximum degree: 463\n" + + " maximum out degree: 334\n" + + " maximum in degree: 342\n" + + " maximum triplets: 106,953\n" + + "\n" + + "Edge metrics:\n" + + " triangle triplet count: 107,817\n" + + " rectangle triplet count: 315,537\n" + + " maximum triangle triplets: 820\n" + + " maximum rectangle triplets: 3,822\n"; + + String[] arguments = new String[]{"--algorithm", "GraphMetrics", "--order", "directed", + "--input", "RMatGraph", "--type", "integer", "--simplify", "directed", + "--output"}; + + expectedOutput(ArrayUtils.addAll(arguments, "hash"), expected); + expectedOutput(ArrayUtils.addAll(arguments, "print"), expected); + } + + @Test + public void testWithUndirectedRMatIntegerGraph() throws Exception { + String expected = "\n" + + "Vertex metrics:\n" + + " vertex count: 902\n" + + " edge count: 10,442\n" + + " average degree: 23.153\n" + + " density: 0.025697\n" + + " triplet count: 1,003,442\n" + + " maximum degree: 463\n" + + " maximum triplets: 106,953\n" + + "\n" + + "Edge metrics:\n" + + " triangle triplet count: 107,817\n" + + " rectangle triplet count: 315,537\n" + + " maximum triangle triplets: 820\n" + + " maximum rectangle triplets: 3,822\n"; + + String[] arguments = new String[]{"--algorithm", "GraphMetrics", "--order", "undirected", + "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected", + "--output"}; + + expectedOutput(ArrayUtils.addAll(arguments, "hash"), expected); + expectedOutput(ArrayUtils.addAll(arguments, "print"), expected); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java new file mode 100644 index 0000000..5474d1b --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers; + +import org.apache.flink.client.program.ProgramParametrizationException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class HITSITCase +extends DriverBaseITCase { + + public HITSITCase(TestExecutionMode mode) { + super(mode); + } + + @Test + public void testLongDescription() throws Exception { + String expected = regexSubstring(new HITS().getLongDescription()); + + expectedOutputFromException( + new String[]{"--algorithm", "HITS"}, + expected, + ProgramParametrizationException.class); + } + + @Test + public void testPrintWithRMatIntegerGraph() throws Exception { + expectedCount( + new String[]{"--algorithm", "HITS", + "--input", "RMatGraph", "--type", "integer", "--simplify", "directed", + "--output", "print"}, + 902); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java new file mode 100644 index 0000000..0632856 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers; + +import org.apache.flink.client.program.ProgramParametrizationException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class JaccardIndexITCase +extends DriverBaseITCase { + + public JaccardIndexITCase(TestExecutionMode mode) { + super(mode); + } + + @Test + public void testLongDescription() throws Exception { + String expected = regexSubstring(new JaccardIndex().getLongDescription()); + + expectedOutputFromException( + new String[]{"--algorithm", "JaccardIndex"}, + expected, + ProgramParametrizationException.class); + } + + @Test + public void testHashWithRMatIntegerGraph() throws Exception { + String expected = "\nChecksumHashCode 0x0001b188570b2572, count 221628\n"; + + expectedOutput( + new String[]{"--algorithm", "JaccardIndex", + "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected", + "--output", "hash"}, + expected); + } + + @Test + public void testPrintWithRMatIntegerGraph() throws Exception { + expectedCount( + new String[]{"--algorithm", "JaccardIndex", + "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected", + "--output", "print"}, + 221628); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java new file mode 100644 index 0000000..d7301d0 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers; + +import org.apache.flink.client.program.ProgramParametrizationException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class PageRankITCase +extends DriverBaseITCase { + + public PageRankITCase(TestExecutionMode mode) { + super(mode); + } + + @Test + public void testLongDescription() throws Exception { + String expected = regexSubstring(new PageRank().getLongDescription()); + + expectedOutputFromException( + new String[]{"--algorithm", "PageRank"}, + expected, + ProgramParametrizationException.class); + } + + @Test + public void testPrintWithRMatIntegerGraph() throws Exception { + expectedCount( + new String[]{"--algorithm", "PageRank", + "--input", "RMatGraph", "--type", "integer", "--simplify", "directed", + "--output", "print"}, + 902); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java new file mode 100644 index 0000000..0d2897c --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers; + +import org.apache.flink.client.program.ProgramParametrizationException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TriangleListingITCase +extends DriverBaseITCase { + + public TriangleListingITCase(TestExecutionMode mode) { + super(mode); + } + + @Test + public void testLongDescription() throws Exception { + String expected = regexSubstring(new TriangleListing().getLongDescription()); + + expectedOutputFromException( + new String[]{"--algorithm", "TriangleListing"}, + expected, + ProgramParametrizationException.class); + } + + @Test + public void testDirectedHashWithRMatIntegerGraph() throws Exception { + String expected = "\n" + + "ChecksumHashCode 0x0000001beffe6edd, count 75049\n" + + "Triadic census:\n" + + " 003: 113,435,893\n" + + " 012: 6,632,528\n" + + " 102: 983,535\n" + + " 021d: 118,574\n" + + " 021u: 118,566\n" + + " 021c: 237,767\n" + + " 111d: 129,773\n" + + " 111u: 130,041\n" + + " 030t: 16,981\n" + + " 030c: 5,535\n" + + " 201: 43,574\n" + + " 120d: 7,449\n" + + " 120u: 7,587\n" + + " 120c: 15,178\n" + + " 210: 17,368\n" + + " 300: 4,951\n"; + + expectedOutput( + new String[]{"--algorithm", "TriangleListing", "--order", "directed", "--sort_triangle_vertices", "--triadic_census", + "--input", "RMatGraph", "--type", "integer", "--simplify", "directed", + "--output", "hash"}, + expected); + } + + @Test + public void testDirectedPrintWithRMatIntegerGraph() throws Exception { + expectedCount( + new String[]{"--algorithm", "TriangleListing", "--order", "directed", + "--input", "RMatGraph", "--type", "integer", "--simplify", "directed", + "--output", "print"}, + 75049); + } + + @Test + public void testUndirectedHashWithRMatIntegerGraph() throws Exception { + String expected = "\n" + + "ChecksumHashCode 0x00000000e6b3f32c, count 75049\n" + + "Triadic census:\n" + + " 03: 113,435,893\n" + + " 12: 7,616,063\n" + + " 21: 778,295\n" + + " 30: 75,049\n"; + + expectedOutput( + new String[]{"--algorithm", "TriangleListing", "--order", "undirected", "--sort_triangle_vertices", "--triadic_census", + "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected", + "--output", "hash"}, + expected); + } + + @Test + public void testUndirectedPrintWithRMatIntegerGraph() throws Exception { + expectedCount( + new String[]{"--algorithm", "TriangleListing", "--order", "undirected", + "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected", + "--output", "print"}, + 75049); + } +}
