Repository: flink
Updated Branches:
  refs/heads/master 31e120a98 -> f1ff99fdc


[FLINK-4949] [gelly] Refactor Gelly driver inputs

The Gelly drivers started as simple wrappers around library algorithms
but have grown to handle a matrix of input sources while often running
multiple algorithms and analytics with custom parameterization.

The monolithic drivers are replaced with separate inputs and algorithms.
Command-line parameter parsers are shared and reusable across inputs and
algorithms. Algorithm results now implement a common AlgorithmResult
interface. Drivers are now tested with integration tests.

This closes #3294


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1ff99fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1ff99fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1ff99fd

Branch: refs/heads/master
Commit: f1ff99fdc1e228acd936f5684832d5cf49bdbe04
Parents: 31e120a
Author: Greg Hogan <[email protected]>
Authored: Wed Oct 26 15:18:50 2016 -0400
Committer: Greg Hogan <[email protected]>
Committed: Fri Mar 31 15:57:54 2017 -0400

----------------------------------------------------------------------
 docs/dev/libs/gelly/index.md                    |  40 ++-
 flink-libraries/flink-gelly-examples/pom.xml    |   9 +-
 .../java/org/apache/flink/graph/Runner.java     | 357 +++++++++++++++++++
 .../main/java/org/apache/flink/graph/Usage.java |  71 ----
 .../org/apache/flink/graph/RunnerITCase.java    | 122 +++++++
 .../flink/graph/drivers/AdamicAdarITCase.java   |  52 +++
 .../drivers/ClusteringCoefficientITCase.java    |  89 +++++
 .../drivers/ConnectedComponentsITCase.java      |  65 ++++
 .../flink/graph/drivers/DriverBaseITCase.java   | 185 ++++++++++
 .../flink/graph/drivers/EdgeListITCase.java     | 240 +++++++++++++
 .../flink/graph/drivers/GraphMetricsITCase.java | 100 ++++++
 .../apache/flink/graph/drivers/HITSITCase.java  |  52 +++
 .../flink/graph/drivers/JaccardIndexITCase.java |  63 ++++
 .../flink/graph/drivers/PageRankITCase.java     |  52 +++
 .../graph/drivers/TriangleListingITCase.java    | 107 ++++++
 15 files changed, 1518 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/docs/dev/libs/gelly/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/index.md b/docs/dev/libs/gelly/index.md
index 8f8c6de..40018e8 100644
--- a/docs/dev/libs/gelly/index.md
+++ b/docs/dev/libs/gelly/index.md
@@ -73,7 +73,7 @@ Running Gelly Examples
 
 The Gelly library and examples jars are provided in the [Flink 
distribution](https://flink.apache.org/downloads.html "Apache Flink: Downloads")
 in the folder **opt** (for versions older than Flink 1.2 these can be manually 
downloaded from
-[Maven Central](http://search.maven.org/#search|ga|1|flink%20gelly).
+[Maven Central](http://search.maven.org/#search|ga|1|flink%20gelly)).
 
 To run the Gelly examples the **flink-gelly** (for Java) or 
**flink-gelly-scala** (for Scala) jar must be copied to
 Flink's **lib** directory.
@@ -83,21 +83,29 @@ cp opt/flink-gelly_*.jar lib/
 cp opt/flink-gelly-scala_*.jar lib/
 ~~~
 
-Gelly's examples jar includes both drivers for the library methods as well as 
additional example algorithms. After
-configuring and starting the cluster, list the available algorithm classes:
+Gelly's examples jar includes drivers for each of the library methods. After 
configuring and starting the cluster, list
+the available algorithm classes:
 
 ~~~bash
 ./bin/start-cluster.sh
 ./bin/flink run opt/flink-gelly-examples_*.jar
 ~~~
 
-The Gelly drivers can generate 
[RMat](http://www.cs.cmu.edu/~christos/PUBLICATIONS/siam04.pdf) graph data or 
read the
-edge list from a CSV file. Each node in a cluster must have access to the 
input file. Calculate graph metrics on a
-directed generated graph:
+The Gelly drivers can generate graph data or read the edge list from a CSV 
file (each node in a cluster must have access
+to the input file). The algorithm description, available inputs and outputs, 
and configuration are displayed when an
+algorithm is selected. Print usage for 
[JaccardIndex](./library_methods.html#jaccard-index):
 
 ~~~bash
-./bin/flink run -c org.apache.flink.graph.drivers.GraphMetrics 
opt/flink-gelly-examples_*.jar \
-    --directed true --input rmat
+./bin/flink run opt/flink-gelly-examples_*.jar --algorithm JaccardIndex
+~~~
+
+Display [graph metrics](./library_methods.html#metric) for a million vertex 
graph:
+
+~~~bash
+./bin/flink run opt/flink-gelly-examples_*.jar \
+    --algorithm GraphMetrics --order directed \
+    --input RMatGraph --type integer --scale 20 --simplify directed \
+    --output print
 ~~~
 
 The size of the graph is adjusted by the *\-\-scale* and *\-\-edge_factor* 
parameters. The
@@ -111,15 +119,19 @@ Run a few algorithms and monitor the job progress in 
Flink's Web UI:
 ~~~bash
 wget -O - 
http://snap.stanford.edu/data/bigdata/communities/com-lj.ungraph.txt.gz | 
gunzip -c > com-lj.ungraph.txt
 
-./bin/flink run -q -c org.apache.flink.graph.drivers.GraphMetrics 
opt/flink-gelly-examples_*.jar \
-    --directed true --input csv --type integer --input_filename 
com-lj.ungraph.txt --input_field_delimiter '\t'
+./bin/flink run -q opt/flink-gelly-examples_*.jar \
+    --algorithm GraphMetrics --order undirected \
+    --input CSV --type integer --simplify undirected --input_filename 
com-lj.ungraph.txt --input_field_delimiter $'\t' \
+    --output print
 
-./bin/flink run -q -c org.apache.flink.graph.drivers.ClusteringCoefficient 
opt/flink-gelly-examples_*.jar \
-    --directed true --input csv --type integer --input_filename 
com-lj.ungraph.txt  --input_field_delimiter '\t' \
+./bin/flink run -q opt/flink-gelly-examples_*.jar \
+    --algorithm ClusteringCoefficient --order undirected \
+    --input CSV --type integer --simplify undirected --input_filename 
com-lj.ungraph.txt --input_field_delimiter $'\t' \
     --output hash
 
-./bin/flink run -q -c org.apache.flink.graph.drivers.JaccardIndex 
opt/flink-gelly-examples_*.jar \
-    --input csv --type integer --simplify true --input_filename 
com-lj.ungraph.txt --input_field_delimiter '\t' \
+./bin/flink run -q opt/flink-gelly-examples_*.jar \
+    --algorithm JaccardIndex \
+    --input CSV --type integer --simplify undirected --input_filename 
com-lj.ungraph.txt --input_field_delimiter $'\t' \
     --output hash
 ~~~
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/pom.xml 
b/flink-libraries/flink-gelly-examples/pom.xml
index b533119..e95aa37 100644
--- a/flink-libraries/flink-gelly-examples/pom.xml
+++ b/flink-libraries/flink-gelly-examples/pom.xml
@@ -83,6 +83,13 @@
                        <version>${project.version}</version>
                        <scope>test</scope>
                </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-tests_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 
        <build>
@@ -163,7 +170,7 @@
                                <configuration>
                                        <archive>
                                                <manifestEntries>
-                                                       
<Main-Class>org.apache.flink.graph.Usage</Main-Class>
+                                                       
<Main-Class>org.apache.flink.graph.Runner</Main-Class>
                                                </manifestEntries>
                                        </archive>
                                </configuration>

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
new file mode 100644
index 0000000..0324814
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.drivers.AdamicAdar;
+import org.apache.flink.graph.drivers.ClusteringCoefficient;
+import org.apache.flink.graph.drivers.ConnectedComponents;
+import org.apache.flink.graph.drivers.Driver;
+import org.apache.flink.graph.drivers.EdgeList;
+import org.apache.flink.graph.drivers.GraphMetrics;
+import org.apache.flink.graph.drivers.HITS;
+import org.apache.flink.graph.drivers.JaccardIndex;
+import org.apache.flink.graph.drivers.PageRank;
+import org.apache.flink.graph.drivers.TriangleListing;
+import org.apache.flink.graph.drivers.input.CompleteGraph;
+import org.apache.flink.graph.drivers.input.CycleGraph;
+import org.apache.flink.graph.drivers.input.EmptyGraph;
+import org.apache.flink.graph.drivers.input.GridGraph;
+import org.apache.flink.graph.drivers.input.HypercubeGraph;
+import org.apache.flink.graph.drivers.input.Input;
+import org.apache.flink.graph.drivers.input.PathGraph;
+import org.apache.flink.graph.drivers.input.RMatGraph;
+import org.apache.flink.graph.drivers.input.SingletonEdgeGraph;
+import org.apache.flink.graph.drivers.input.StarGraph;
+import org.apache.flink.graph.drivers.output.Hash;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.Parameterized;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * This default main class executes Flink drivers.
+ *
+ * An execution has one input, one algorithm, and one output. Anything more
+ * complex can be expressed as a user program written in a JVM language.
+ *
+ * Inputs and algorithms are decoupled by, respectively, producing and
+ * consuming a graph. Currently only {@code Graph} is supported but later
+ * updates may add support for new graph types such as {@code BipartiteGraph}.
+ *
+ * Algorithms must explicitly support each type of output via implementation of
+ * interfaces. This is scalable as the number of outputs is small and finite.
+ */
+public class Runner {
+
+       private static final String INPUT = "input";
+
+       private static final String ALGORITHM = "algorithm";
+
+       private static final String OUTPUT = "output";
+
+       private static ParameterizedFactory<Input> inputFactory = new 
ParameterizedFactory<Input>()
+               .addClass(CompleteGraph.class)
+               .addClass(org.apache.flink.graph.drivers.input.CSV.class)
+               .addClass(CycleGraph.class)
+               .addClass(EmptyGraph.class)
+               .addClass(GridGraph.class)
+               .addClass(HypercubeGraph.class)
+               .addClass(PathGraph.class)
+               .addClass(RMatGraph.class)
+               .addClass(SingletonEdgeGraph.class)
+               .addClass(StarGraph.class);
+
+       private static ParameterizedFactory<Driver> driverFactory = new 
ParameterizedFactory<Driver>()
+               .addClass(AdamicAdar.class)
+               .addClass(ClusteringCoefficient.class)
+               .addClass(ConnectedComponents.class)
+               .addClass(EdgeList.class)
+               .addClass(GraphMetrics.class)
+               .addClass(HITS.class)
+               .addClass(JaccardIndex.class)
+               .addClass(PageRank.class)
+               .addClass(TriangleListing.class);
+
+       /**
+        * List available algorithms. This is displayed to the user when no 
valid
+        * algorithm is given in the program parameterization.
+        *
+        * @return usage string listing available algorithms
+        */
+       private static String getAlgorithmsListing() {
+               StrBuilder strBuilder = new StrBuilder();
+
+               strBuilder
+                       .appendNewLine()
+                       .appendln("Select an algorithm to view usage: flink run 
opt/flink-gelly-examples_<version>.jar --algorithm <algorithm>")
+                       .appendNewLine()
+                       .appendln("Available algorithms:");
+
+               for (Driver algorithm : driverFactory) {
+                       strBuilder.append("  ")
+                               .appendFixedWidthPadRight(algorithm.getName(), 
30, ' ')
+                               
.append(algorithm.getShortDescription()).appendNewLine();
+               }
+
+               return strBuilder.toString();
+       }
+
+       /**
+        * Display the usage for the given algorithm. This includes options for 
all
+        * compatible inputs, the selected algorithm, and outputs implemented by
+        * the selected algorithm.
+        *
+        * @param algorithmName unique identifier of the selected algorithm
+        * @return usage string for the given algorithm
+        */
+       private static String getAlgorithmUsage(String algorithmName) {
+               StrBuilder strBuilder = new StrBuilder();
+
+               Driver algorithm = driverFactory.get(algorithmName);
+
+               strBuilder
+                       .appendNewLine()
+                       .appendNewLine()
+                       .appendln(algorithm.getLongDescription())
+                       .appendNewLine()
+                       .append("usage: flink run 
opt/flink-gelly-examples_<version>.jar --algorithm ")
+                       .append(algorithmName)
+                       .append(" [algorithm options] --input <input> [input 
options] --output <output> [output options]")
+                       .appendNewLine()
+                       .appendNewLine()
+                       .appendln("Available inputs:");
+
+               for (Input input : inputFactory) {
+                       strBuilder
+                               .append("  --input ")
+                               .append(input.getName())
+                               .append(" ")
+                               .appendln(input.getUsage());
+               }
+
+               String algorithmParameterization = algorithm.getUsage();
+
+               if (algorithmParameterization.length() > 0) {
+                       strBuilder
+                               .appendNewLine()
+                               .appendln("Algorithm configuration:")
+                               .append("  ")
+                               .appendln(algorithm.getUsage());
+               }
+
+               strBuilder
+                       .appendNewLine()
+                       .appendln("Available outputs:");
+
+               if (algorithm instanceof 
org.apache.flink.graph.drivers.output.CSV) {
+                       strBuilder.appendln("  --output csv --output_filename 
FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter 
FIELD_DELIMITER]");
+               }
+
+               if (algorithm instanceof Hash) {
+                       strBuilder.appendln("  --output hash");
+               }
+
+               if (algorithm instanceof Print) {
+                       strBuilder.appendln("  --output print");
+               }
+
+               return strBuilder
+                       .appendNewLine()
+                       .toString();
+       }
+
+       public static void main(String[] args) throws Exception {
+               // Set up the execution environment
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               // should not have any non-Flink data types
+               env.getConfig().disableAutoTypeRegistration();
+               env.getConfig().disableForceAvro();
+               env.getConfig().disableForceKryo();
+
+               ParameterTool parameters = ParameterTool.fromArgs(args);
+               env.getConfig().setGlobalJobParameters(parameters);
+
+               // integration tests run with with object reuse both disabled 
and enabled
+               if (parameters.has("__disable_object_reuse")) {
+                       env.getConfig().disableObjectReuse();
+               } else {
+                       env.getConfig().enableObjectReuse();
+               }
+
+               // Usage
+
+               if (!parameters.has(ALGORITHM)) {
+                       throw new 
ProgramParametrizationException(getAlgorithmsListing());
+               }
+
+               String algorithmName = parameters.get(ALGORITHM);
+               Driver algorithm = driverFactory.get(algorithmName);
+
+               if (algorithm == null) {
+                       throw new ProgramParametrizationException("Unknown 
algorithm name: " + algorithmName);
+               }
+
+               if (!parameters.has(INPUT)) {
+                       if (!parameters.has(OUTPUT)) {
+                               // if neither input nor output is given then 
print algorithm usage
+                               throw new 
ProgramParametrizationException(getAlgorithmUsage(algorithmName));
+                       }
+                       throw new ProgramParametrizationException("No input 
given");
+               }
+
+               String inputName = parameters.get(INPUT);
+               Input input = inputFactory.get(inputName);
+
+               if (input == null) {
+                       throw new ProgramParametrizationException("Unknown 
input type: " + inputName);
+               }
+
+               // Input
+
+               input.configure(parameters);
+               Graph graph = input.create(env);
+
+               // Algorithm
+
+               algorithm.configure(parameters);
+               algorithm.plan(graph);
+
+               // Output
+               if (!parameters.has(OUTPUT)) {
+                       throw new ProgramParametrizationException("No output 
given");
+               }
+
+               String outputName = parameters.get(OUTPUT);
+               String executionNamePrefix = input.getIdentity() + " -> " + 
algorithmName + " -> ";
+
+               System.out.println();
+
+               switch (outputName.toLowerCase()) {
+                       case "csv":
+                               if (algorithm instanceof 
org.apache.flink.graph.drivers.output.CSV) {
+                                       String filename = 
parameters.getRequired("output_filename");
+
+                                       String lineDelimiter = 
StringEscapeUtils.unescapeJava(
+                                               
parameters.get("output_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+                                       String fieldDelimiter = 
StringEscapeUtils.unescapeJava(
+                                               
parameters.get("output_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+                                       
org.apache.flink.graph.drivers.output.CSV c = 
(org.apache.flink.graph.drivers.output.CSV) algorithm;
+                                       c.writeCSV(filename, lineDelimiter, 
fieldDelimiter);
+
+                                       env.execute(executionNamePrefix + 
"CSV");
+                               } else {
+                                       throw new 
ProgramParametrizationException("Algorithm does not support output type 'CSV'");
+                               }
+                               break;
+
+                       case "hash":
+                               if (algorithm instanceof Hash) {
+                                       Hash h = (Hash) algorithm;
+                                       h.hash(executionNamePrefix + "Hash");
+                               } else {
+                                       throw new 
ProgramParametrizationException("Algorithm does not support output type 
'hash'");
+                               }
+                               break;
+
+                       case "print":
+                               if (algorithm instanceof Print) {
+                                       Print h = (Print) algorithm;
+                                       h.print(executionNamePrefix + "Print");
+                               } else {
+                                       throw new 
ProgramParametrizationException("Algorithm does not support output type 
'print'");
+                               }
+                               break;
+
+                       default:
+                               throw new 
ProgramParametrizationException("Unknown output type: " + outputName);
+               }
+       }
+
+       /**
+        * Stores a list of classes for which an instance can be requested by 
name
+        * and implements an iterator over class instances.
+        *
+        * @param <T> base type for stored classes
+        */
+       private static class ParameterizedFactory<T extends Parameterized>
+       implements Iterable<T> {
+               private List<Class<? extends T>> classes = new ArrayList<>();
+
+               /**
+                * Add a class to the factory.
+                *
+                * @param cls subclass of T
+                * @return this
+                */
+               public ParameterizedFactory<T> addClass(Class<? extends T> cls) 
{
+                       this.classes.add(cls);
+                       return this;
+               }
+
+               /**
+                * Obtain a class instance by name.
+                *
+                * @param name String matching getName()
+                * @return class instance or null if no matching class
+                */
+               public T get(String name) {
+                       for (T instance : this) {
+                               if (name.equals(instance.getName())) {
+                                       return instance;
+                               }
+                       }
+
+                       return null;
+               }
+
+               @Override
+               public Iterator<T> iterator() {
+                       return new Iterator<T>() {
+                               private int index;
+
+                               @Override
+                               public boolean hasNext() {
+                                       return index < classes.size();
+                               }
+
+                               @Override
+                               public T next() {
+                                       return 
InstantiationUtil.instantiate(classes.get(index++));
+                               }
+
+                               @Override
+                               public void remove() {
+                                       throw new 
UnsupportedOperationException();
+                               }
+                       };
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
deleted file mode 100644
index 642fe5b..0000000
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph;
-
-import org.apache.commons.lang3.text.StrBuilder;
-import org.apache.flink.client.program.ProgramParametrizationException;
-
-/**
- * This default main class prints usage listing available classes.
- */
-public class Usage {
-
-       private static final Class[] DRIVERS = new Class[]{
-               org.apache.flink.graph.drivers.ClusteringCoefficient.class,
-               org.apache.flink.graph.drivers.GraphMetrics.class,
-               org.apache.flink.graph.drivers.HITS.class,
-               org.apache.flink.graph.drivers.JaccardIndex.class,
-               org.apache.flink.graph.drivers.TriangleListing.class,
-       };
-
-       private static final Class[] EXAMPLES = new Class[]{
-               org.apache.flink.graph.examples.EuclideanGraphWeighing.class,
-               
org.apache.flink.graph.examples.GSASingleSourceShortestPaths.class,
-               org.apache.flink.graph.examples.IncrementalSSSP.class,
-               org.apache.flink.graph.examples.MusicProfiles.class,
-               org.apache.flink.graph.examples.PregelSSSP.class,
-               org.apache.flink.graph.examples.SingleSourceShortestPaths.class,
-               org.apache.flink.graph.scala.examples.ConnectedComponents.class,
-               
org.apache.flink.graph.scala.examples.GSASingleSourceShortestPaths.class,
-               
org.apache.flink.graph.scala.examples.SingleSourceShortestPaths.class,
-       };
-
-       private static String getUsage() {
-               StrBuilder strBuilder = new StrBuilder();
-
-               strBuilder.appendNewLine();
-               strBuilder.appendln("Driver classes call algorithms from the 
Gelly library:");
-               for (Class cls : DRIVERS) {
-                       strBuilder.append("  ").appendln(cls.getName());
-               }
-
-               strBuilder.appendNewLine();
-               strBuilder.appendln("Example classes illustrate Gelly APIs or 
alternative algorithms:");
-               for (Class cls : EXAMPLES) {
-                       strBuilder.append("  ").appendln(cls.getName());
-               }
-
-               return strBuilder.toString();
-       }
-
-       public static void main(String[] args) throws Exception {
-               // this exception is throw to prevent Flink from printing an 
error message
-               throw new ProgramParametrizationException(getUsage());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/RunnerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/RunnerITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/RunnerITCase.java
new file mode 100644
index 0000000..a48fdf1
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/RunnerITCase.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.drivers.DriverBaseITCase;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class RunnerITCase
+extends DriverBaseITCase {
+
+       @Rule
+       public ExpectedException thrown = ExpectedException.none();
+
+       public RunnerITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Test
+       public void testWithoutAlgorithm() throws Exception {
+               String expected = "Select an algorithm to view usage:";
+
+               thrown.expect(ProgramParametrizationException.class);
+               thrown.expectMessage(expected);
+
+               expectedOutput(new String[]{}, expected);
+       }
+
+       @Test
+       public void testWithUnknownAlgorithm() throws Exception {
+               String expected = "Unknown algorithm name: NotAnAlgorithm";
+
+               thrown.expect(ProgramParametrizationException.class);
+               thrown.expectMessage(expected);
+
+               expectedOutput(new String[]{"--algorithm", "NotAnAlgorithm"}, 
expected);
+       }
+
+       @Test
+       public void testAlgorithmUsage() throws Exception {
+               String expected = "Pass-through of the graph's edge list.";
+
+               thrown.expect(ProgramParametrizationException.class);
+               thrown.expectMessage(expected);
+
+               expectedOutput(new String[]{"--algorithm", "EdgeList"}, 
expected);
+       }
+
+       @Test
+       public void testWithoutInput() throws Exception {
+               String expected = "No input given";
+
+               thrown.expect(ProgramParametrizationException.class);
+               thrown.expectMessage(expected);
+
+               expectedOutput(
+                       new String[]{"--algorithm", "EdgeList",
+                               "--output", "NotAnOutput"},
+                       expected);
+       }
+
+       @Test
+       public void testWithUnknownInput() throws Exception {
+               String expected = "Unknown input type: NotAnInput";
+
+               thrown.expect(ProgramParametrizationException.class);
+               thrown.expectMessage(expected);
+
+               expectedOutput(
+                       new String[]{"--algorithm", "EdgeList",
+                               "--input", "NotAnInput"},
+                       expected);
+       }
+
+       @Test
+       public void testWithoutOutput() throws Exception {
+               String expected = "No output given";
+
+               thrown.expect(ProgramParametrizationException.class);
+               thrown.expectMessage(expected);
+
+               expectedOutput(
+                       new String[]{"--algorithm", "EdgeList",
+                               "--input", "RMatGraph"},
+                       expected);
+       }
+
+       @Test
+       public void testWithUnknownOutput() throws Exception {
+               String expected = "Unknown output type: NotAnOutput";
+
+               thrown.expect(ProgramParametrizationException.class);
+               thrown.expectMessage(expected);
+
+               expectedOutput(
+                       new String[]{"--algorithm", "EdgeList",
+                               "--input", "RMatGraph",
+                               "--output", "NotAnOutput"},
+                       expected);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/AdamicAdarITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/AdamicAdarITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/AdamicAdarITCase.java
new file mode 100644
index 0000000..400c241
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/AdamicAdarITCase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class AdamicAdarITCase
+extends DriverBaseITCase {
+
+       public AdamicAdarITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Test
+       public void testLongDescription() throws Exception {
+               String expected = regexSubstring(new 
AdamicAdar().getLongDescription());
+
+               expectedOutputFromException(
+                       new String[]{"--algorithm", "AdamicAdar"},
+                       expected,
+                       ProgramParametrizationException.class);
+       }
+
+       @Test
+       public void testPrintWithRMatIntegerGraph() throws Exception {
+               expectedCount(
+                       new String[]{"--algorithm", "AdamicAdar",
+                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "undirected",
+                               "--output", "print"},
+                       221628);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ClusteringCoefficientITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ClusteringCoefficientITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ClusteringCoefficientITCase.java
new file mode 100644
index 0000000..f215b91
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ClusteringCoefficientITCase.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ClusteringCoefficientITCase
+extends DriverBaseITCase {
+
+       public ClusteringCoefficientITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Test
+       public void testLongDescription() throws Exception {
+               String expected = regexSubstring(new 
ClusteringCoefficient().getLongDescription());
+
+               expectedOutputFromException(
+                       new String[]{"--algorithm", "ClusteringCoefficient"},
+                       expected,
+                       ProgramParametrizationException.class);
+       }
+
+       @Test
+       public void testDirectedHashWithRMatIntegerGraph() throws Exception {
+               String expected = "\n" +
+                       "ChecksumHashCode 0x000001c0409df6c0, count 902\n" +
+                       "triplet count: 1003442, triangle count: 225147, global 
clustering coefficient: 0.22437470[0-9]+\n" +
+                       "vertex count: 902, average clustering coefficient: 
0.32943748[0-9]+\n";
+
+               expectedOutput(
+                       new String[]{"--algorithm", "ClusteringCoefficient", 
"--order", "directed",
+                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "directed",
+                               "--output", "hash"},
+                       expected);
+       }
+
+       @Test
+       public void testDirectedPrintWithRMatIntegerGraph() throws Exception {
+               expectedCount(
+                       new String[]{"--algorithm", "ClusteringCoefficient", 
"--order", "directed",
+                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "directed",
+                               "--output", "print"},
+                       904);
+       }
+
+       @Test
+       public void testUndirectedHashWithRMatIntegerGraph() throws Exception {
+               String expected = "\n" +
+                       "ChecksumHashCode 0x000001ccf8c45fdb, count 902\n" +
+                       "triplet count: 1003442, triangle count: 225147, global 
clustering coefficient: 0.22437470[0-9]+\n" +
+                       "vertex count: 902, average clustering coefficient: 
0.42173070[0-9]+\n";
+
+               expectedOutput(
+                       new String[]{"--algorithm", "ClusteringCoefficient", 
"--order", "undirected",
+                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "undirected",
+                               "--output", "hash"},
+                       expected);
+       }
+
+       @Test
+       public void testUndirectedPrintWithRMatIntegerGraph() throws Exception {
+               expectedCount(
+                       new String[]{"--algorithm", "ClusteringCoefficient", 
"--order", "undirected",
+                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "undirected",
+                               "--output", "print"},
+                       904);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ConnectedComponentsITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ConnectedComponentsITCase.java
new file mode 100644
index 0000000..b91abb3
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ConnectedComponentsITCase.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ConnectedComponentsITCase
+extends DriverBaseITCase {
+
+       public ConnectedComponentsITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Test
+       public void testLongDescription() throws Exception {
+               String expected = regexSubstring(new 
ConnectedComponents().getLongDescription());
+
+               expectedOutputFromException(
+                       new String[]{"--algorithm", "ConnectedComponents"},
+                       expected,
+                       ProgramParametrizationException.class);
+       }
+
+       @Test
+       public void testHashWithRMatIntegerGraph() throws Exception {
+               String expected = "\\nChecksumHashCode 0x0000000000cdc7e7, 
count 838\\n";
+
+               expectedOutput(
+                       new String[]{"--algorithm", "ConnectedComponents",
+                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "undirected", "--edge_factor", "1",
+                                       "--a", "0.25", "--b", "0.25", "--c", 
"0.25", "--noise_enabled", "--noise", "1.0",
+                               "--output", "hash"},
+                       expected);
+       }
+
+       @Test
+       public void testPrintWithRMatIntegerGraph() throws Exception {
+               expectedCount(
+                       new String[]{"--algorithm", "ConnectedComponents",
+                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "undirected", "--edge_factor", "1",
+                                       "--a", "0.25", "--b", "0.25", "--c", 
"0.25", "--noise_enabled", "--noise", "1.0",
+                               "--output", "print"},
+                       838);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/DriverBaseITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/DriverBaseITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/DriverBaseITCase.java
new file mode 100644
index 0000000..d19ca97
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/DriverBaseITCase.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.flink.graph.Runner;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+import org.junit.runners.Parameterized;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.regex.Pattern;
+
+public abstract class DriverBaseITCase
+extends MultipleProgramsTestBase {
+
+       @Rule
+       public ExpectedException expectedException = ExpectedException.none();
+
+       protected DriverBaseITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       // extend MultipleProgramsTestBase default to include object reuse mode
+       @Parameterized.Parameters(name = "Execution mode = {0}")
+       public static Collection<Object[]> executionModes() {
+               return Arrays.asList(
+                       new Object[] { TestExecutionMode.CLUSTER },
+                       new Object[] { TestExecutionMode.CLUSTER_OBJECT_REUSE },
+                       new Object[] { TestExecutionMode.COLLECTION });
+       }
+
+       /**
+        * Simpler variant of {@link #expectedOutput(String[], String)}
+        * that only compares the count of the number of records in standard 
output.
+        * This is intended for use for algorithms where the result cannot be
+        * hashed due to approximate results (typically floating point 
arithmetic).
+        *
+        * @param parameters algorithm, input, and output arguments
+        * @param records expected number of records in standard output
+        * @throws Exception on error
+        */
+       protected void expectedCount(String[] parameters, int records) throws 
Exception {
+               String output = getSystemOutput(parameters);
+
+               // subtract the extra newline
+               int numberOfRecords = 
output.split(System.getProperty("line.separator")).length - 1;
+               Assert.assertEquals(records, numberOfRecords);
+       }
+
+       /**
+        * Executes the driver with the provided arguments and compares the
+        * standard output with the given regular expression.
+        *
+        * @param parameters algorithm, input, and output arguments
+        * @param expected expected standard output
+        * @throws Exception on error
+        */
+       protected void expectedOutput(String[] parameters, String expected) 
throws Exception {
+               String output = getSystemOutput(parameters);
+
+               Assert.assertThat(output, RegexMatcher.matchesRegex(expected));
+       }
+
+       /**
+        * Executes the driver with the provided arguments and compares the
+        * exception and exception method with the given class and regular
+        * expression.
+        *
+        * @param parameters algorithm, input, and output arguments
+        * @param expected expected standard output
+        * @param exception expected exception
+        * @throws Exception on error when not matching exception
+        */
+       protected void expectedOutputFromException(String[] parameters, String 
expected,Class<? extends Throwable> exception) throws Exception {
+               expectedException.expect(exception);
+               
expectedException.expectMessage(RegexMatcher.matchesRegex(expected));
+
+               getSystemOutput(parameters);
+       }
+
+       /**
+        * Generate a regular expression string by quoting the input string and
+        * adding wildcard matchers to the beginning and end.
+        *
+        * @param input source string
+        * @return regex string
+        */
+       protected String regexSubstring(String input) {
+               // Pattern.quote disables regex interpretation of the input 
string and
+               // flag expression "(?s)" (Pattern.DOTALL) matches "." against 
any
+               // character including line terminators
+               return "(?s).*" + Pattern.quote(input) + ".*";
+       }
+
+       /**
+        * Capture the command-line standard output from the driver execution.
+        *
+        * @param args driver command-line arguments
+        * @return standard output from driver execution
+        * @throws Exception on error
+        */
+       private String getSystemOutput(String[] args) throws Exception {
+               ByteArrayOutputStream output = new ByteArrayOutputStream();
+
+               // Configure object reuse mode
+               switch (mode) {
+                       case CLUSTER:
+                       case COLLECTION:
+                               args = (String[])ArrayUtils.add(args, 
"--__disable_object_reuse");
+                               break;
+
+                       case CLUSTER_OBJECT_REUSE:
+                               // object reuse is enabled by default when 
executing drivers
+                               break;
+
+                       default:
+                               throw new FlinkRuntimeException("Unknown 
execution mode " + mode);
+               }
+
+               // Redirect stdout
+               PrintStream stdout = System.out;
+               System.setOut(new PrintStream(output));
+
+               Runner.main(args);
+
+               // Restore stdout
+               System.setOut(stdout);
+
+               return output.toString();
+       }
+
+       /**
+        * Implements a Hamcrest regex matcher. Hamcrest 2.0 provides
+        * Matchers.matchesPattern(String) but Flink depends on Hamcrest 1.3.
+        *
+        * see http://stackoverflow.com/a/25021229
+        */
+       private static class RegexMatcher
+       extends TypeSafeMatcher<String> {
+               private final String regex;
+
+               private RegexMatcher(final String regex) {
+                       this.regex = regex;
+               }
+
+               @Override
+               public void describeTo(final Description description) {
+                       description.appendText("matches regex=`" + regex + "`");
+               }
+
+               @Override
+               public boolean matchesSafely(final String string) {
+                       return string.matches(regex);
+               }
+
+               public static RegexMatcher matchesRegex(final String regex) {
+                       return new RegexMatcher(regex);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
new file mode 100644
index 0000000..d9cac8b
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class EdgeListITCase
+extends DriverBaseITCase {
+
+       public EdgeListITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Test
+       public void testLongDescription() throws Exception {
+               String expected = regexSubstring(new 
EdgeList().getLongDescription());
+
+               expectedOutputFromException(
+                       new String[]{"--algorithm", "EdgeList"},
+                       expected,
+                       ProgramParametrizationException.class);
+       }
+
+       @Test
+       public void testHashWithCompleteGraph() throws Exception {
+               String expected = "\nChecksumHashCode 0x0000000006788c22, count 
1722\n";
+
+               expectedOutput(
+                       new String[]{"--algorithm", "EdgeList",
+                               "--input", "CompleteGraph", "--vertex_count", 
"42",
+                               "--output", "hash"},
+                       expected);
+       }
+
+       @Test
+       public void testHashWithCycleGraph() throws Exception {
+               String expected = "\nChecksumHashCode 0x000000000050cea4, count 
84\n";
+
+               expectedOutput(
+                       new String[]{"--algorithm", "EdgeList",
+                               "--input", "CycleGraph", "--vertex_count", "42",
+                               "--output", "hash"},
+                       expected);
+       }
+
+       @Test
+       public void testHashWithEmptyGraph() throws Exception {
+               String expected = "\nChecksumHashCode 0x0000000000000000, count 
0\n";
+
+               expectedOutput(
+                       new String[]{"--algorithm", "EdgeList",
+                               "--input", "EmptyGraph", "--vertex_count", "42",
+                               "--output", "hash"},
+                       expected);
+       }
+
+       @Test
+       public void testHashWithGridGraph() throws Exception {
+               String expected = "\nChecksumHashCode 0x00000000357d33a6, count 
2990\n";
+
+               expectedOutput(
+                       new String[]{"--algorithm", "EdgeList",
+                               "--input", "GridGraph", "--dim0", "5:true", 
"--dim1", "8:false", "--dim2", "13:true",
+                               "--output", "hash"},
+                       expected);
+       }
+
+       @Test
+       public void testHashWithHypercubeGraph() throws Exception {
+               String expected = "\nChecksumHashCode 0x0000000014a72800, count 
2048\n";
+
+               expectedOutput(
+                       new String[]{"--algorithm", "EdgeList",
+                               "--input", "HypercubeGraph", "--dimensions", 
"8",
+                               "--output", "hash"},
+                       expected);
+       }
+
+       @Test
+       public void testHashWithPathGraph() throws Exception {
+               String expected = "\nChecksumHashCode 0x00000000004ee21a, count 
82\n";
+
+               expectedOutput(
+                       new String[]{"--algorithm", "EdgeList",
+                               "--input", "PathGraph", "--vertex_count", "42",
+                               "--output", "hash"},
+                       expected);
+       }
+
+       @Test
+       public void testHashWithRMatIntegerGraph() throws Exception {
+               String expected = "\nChecksumHashCode 0x00000000ed469103, count 
16384\n";
+
+               expectedOutput(
+                       new String[]{"--algorithm", "EdgeList",
+                               "--input", "RMatGraph", "--type", "integer",
+                               "--output", "hash"},
+                       expected);
+       }
+
+       @Test
+       public void testHashWithRMatIntegerDirectedGraph() throws Exception {
+               String expected = "\nChecksumHashCode 0x00000000c53bfc9b, count 
12009\n";
+
+               expectedOutput(
+                       new String[]{"--algorithm", "EdgeList",
+                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "directed",
+                               "--output", "hash"},
+                       expected);
+       }
+
+       @Test
+       public void testHashWithRMatIntegerUndirectedGraph() throws Exception {
+               String expected = "\nChecksumHashCode 0x00000001664eb9e4, count 
20884\n";
+
+               expectedOutput(
+                       new String[]{"--algorithm", "EdgeList",
+                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "undirected",
+                               "--output", "hash"},
+                       expected);
+       }
+
+       @Test
+       public void testHashWithRMatLongGraph() throws Exception {
+               String expected = "\nChecksumHashCode 0x0000000116ee9103, count 
16384\n";
+
+               expectedOutput(
+                       new String[]{"--algorithm", "EdgeList",
+                               "--input", "RMatGraph", "--type", "long",
+                               "--output", "hash"},
+                       expected);
+       }
+
+       @Test
+       public void testPrintWithRMatLongGraph() throws Exception {
+
+               expectedCount(
+                       new String[]{"--algorithm", "EdgeList",
+                               "--input", "RMatGraph", "--type", "long",
+                               "--output", "print"},
+                       16384);
+       }
+
+       @Test
+       public void testHashWithRMatLongDirectedGraph() throws Exception {
+               String expected = "\nChecksumHashCode 0x00000000e3c4643b, count 
12009\n";
+
+               expectedOutput(
+                       new String[]{"--algorithm", "EdgeList",
+                               "--input", "RMatGraph", "--type", "long", 
"--simplify", "directed",
+                               "--output", "hash"},
+                       expected);
+       }
+
+       @Test
+       public void testHashWithRMatLongUndirectedGraph() throws Exception {
+               String expected = "\nChecksumHashCode 0x000000019b67ae64, count 
20884\n";
+
+               expectedOutput(
+                       new String[]{"--algorithm", "EdgeList",
+                               "--input", "RMatGraph", "--type", "long", 
"--simplify", "undirected",
+                               "--output", "hash"},
+                       expected);
+       }
+
+       @Test
+       public void testHashWithRMatStringGraph() throws Exception {
+               String expected = "\nChecksumHashCode 0x00000071dc80a623, count 
16384\n";
+
+               expectedOutput(
+                       new String[]{"--algorithm", "EdgeList",
+                               "--input", "RMatGraph", "--type", "string",
+                               "--output", "hash"},
+                       expected);
+       }
+
+       @Test
+       public void testHashWithRMatStringDirectedGraph() throws Exception {
+               String expected = "\nChecksumHashCode 0x0000005d58b3fa7d, count 
12009\n";
+
+               expectedOutput(
+                       new String[]{"--algorithm", "EdgeList",
+                               "--input", "RMatGraph", "--type", "string", 
"--simplify", "directed",
+                               "--output", "hash"},
+                       expected);
+       }
+
+       @Test
+       public void testHashWithRMatStringUndirectedGraph() throws Exception {
+               String expected = "\nChecksumHashCode 0x000000aa54987304, count 
20884\n";
+
+               expectedOutput(
+                       new String[]{"--algorithm", "EdgeList",
+                               "--input", "RMatGraph", "--type", "string", 
"--simplify", "undirected",
+                               "--output", "hash"},
+                       expected);
+       }
+
+       @Test
+       public void testHashWithSingletonEdgeGraph() throws Exception {
+               String expected = "\nChecksumHashCode 0x0000000001af8ee8, count 
200\n";
+
+               expectedOutput(
+                       new String[]{"--algorithm", "EdgeList",
+                               "--input", "SingletonEdgeGraph", 
"--vertex_pair_count", "100",
+                               "--output", "hash"},
+                       expected);
+       }
+
+       @Test
+       public void testHashWithStarGraph() throws Exception {
+               String expected = "\nChecksumHashCode 0x000000000042789a, count 
82\n";
+
+               expectedOutput(
+                       new String[]{"--algorithm", "EdgeList",
+                               "--input", "StarGraph", "--vertex_count", "42",
+                               "--output", "hash"},
+                       expected);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java
new file mode 100644
index 0000000..a5ea486
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class GraphMetricsITCase
+extends DriverBaseITCase {
+
+       public GraphMetricsITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Test
+       public void testLongDescription() throws Exception {
+               String expected = regexSubstring(new 
GraphMetrics().getLongDescription());
+
+               expectedOutputFromException(
+                       new String[]{"--algorithm", "GraphMetrics"},
+                       expected,
+                       ProgramParametrizationException.class);
+       }
+
+       @Test
+       public void testWithDirectedRMatIntegerGraph() throws Exception {
+               String expected = "\n" +
+                       "Vertex metrics:\n" +
+                       "  vertex count: 902\n" +
+                       "  edge count: 12,009\n" +
+                       "  unidirectional edge count: 8,875\n" +
+                       "  bidirectional edge count: 1,567\n" +
+                       "  average degree: 13.314\n" +
+                       "  density: 0.01477663\n" +
+                       "  triplet count: 1,003,442\n" +
+                       "  maximum degree: 463\n" +
+                       "  maximum out degree: 334\n" +
+                       "  maximum in degree: 342\n" +
+                       "  maximum triplets: 106,953\n" +
+                       "\n" +
+                       "Edge metrics:\n" +
+                       "  triangle triplet count: 107,817\n" +
+                       "  rectangle triplet count: 315,537\n" +
+                       "  maximum triangle triplets: 820\n" +
+                       "  maximum rectangle triplets: 3,822\n";
+
+               String[] arguments = new String[]{"--algorithm", 
"GraphMetrics", "--order", "directed",
+                       "--input", "RMatGraph", "--type", "integer", 
"--simplify", "directed",
+                       "--output"};
+
+               expectedOutput(ArrayUtils.addAll(arguments, "hash"), expected);
+               expectedOutput(ArrayUtils.addAll(arguments, "print"), expected);
+       }
+
+       @Test
+       public void testWithUndirectedRMatIntegerGraph() throws Exception {
+               String expected = "\n" +
+                       "Vertex metrics:\n" +
+                       "  vertex count: 902\n" +
+                       "  edge count: 10,442\n" +
+                       "  average degree: 23.153\n" +
+                       "  density: 0.025697\n" +
+                       "  triplet count: 1,003,442\n" +
+                       "  maximum degree: 463\n" +
+                       "  maximum triplets: 106,953\n" +
+                       "\n" +
+                       "Edge metrics:\n" +
+                       "  triangle triplet count: 107,817\n" +
+                       "  rectangle triplet count: 315,537\n" +
+                       "  maximum triangle triplets: 820\n" +
+                       "  maximum rectangle triplets: 3,822\n";
+
+               String[] arguments = new String[]{"--algorithm", 
"GraphMetrics", "--order", "undirected",
+                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "undirected",
+                               "--output"};
+
+               expectedOutput(ArrayUtils.addAll(arguments, "hash"), expected);
+               expectedOutput(ArrayUtils.addAll(arguments, "print"), expected);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java
new file mode 100644
index 0000000..5474d1b
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class HITSITCase
+extends DriverBaseITCase {
+
+       public HITSITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Test
+       public void testLongDescription() throws Exception {
+               String expected = regexSubstring(new 
HITS().getLongDescription());
+
+               expectedOutputFromException(
+                       new String[]{"--algorithm", "HITS"},
+                       expected,
+                       ProgramParametrizationException.class);
+       }
+
+       @Test
+       public void testPrintWithRMatIntegerGraph() throws Exception {
+               expectedCount(
+                       new String[]{"--algorithm", "HITS",
+                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "directed",
+                               "--output", "print"},
+                       902);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java
new file mode 100644
index 0000000..0632856
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class JaccardIndexITCase
+extends DriverBaseITCase {
+
+       public JaccardIndexITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Test
+       public void testLongDescription() throws Exception {
+               String expected = regexSubstring(new 
JaccardIndex().getLongDescription());
+
+               expectedOutputFromException(
+                       new String[]{"--algorithm", "JaccardIndex"},
+                       expected,
+                       ProgramParametrizationException.class);
+       }
+
+       @Test
+       public void testHashWithRMatIntegerGraph() throws Exception {
+               String expected = "\nChecksumHashCode 0x0001b188570b2572, count 
221628\n";
+
+               expectedOutput(
+                       new String[]{"--algorithm", "JaccardIndex",
+                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "undirected",
+                               "--output", "hash"},
+                       expected);
+       }
+
+       @Test
+       public void testPrintWithRMatIntegerGraph() throws Exception {
+               expectedCount(
+                       new String[]{"--algorithm", "JaccardIndex",
+                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "undirected",
+                               "--output", "print"},
+                       221628);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java
new file mode 100644
index 0000000..d7301d0
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class PageRankITCase
+extends DriverBaseITCase {
+
+       public PageRankITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Test
+       public void testLongDescription() throws Exception {
+               String expected = regexSubstring(new 
PageRank().getLongDescription());
+
+               expectedOutputFromException(
+                       new String[]{"--algorithm", "PageRank"},
+                       expected,
+                       ProgramParametrizationException.class);
+       }
+
+       @Test
+       public void testPrintWithRMatIntegerGraph() throws Exception {
+               expectedCount(
+                       new String[]{"--algorithm", "PageRank",
+                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "directed",
+                               "--output", "print"},
+                       902);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
new file mode 100644
index 0000000..0d2897c
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TriangleListingITCase
+extends DriverBaseITCase {
+
+       public TriangleListingITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Test
+       public void testLongDescription() throws Exception {
+               String expected = regexSubstring(new 
TriangleListing().getLongDescription());
+
+               expectedOutputFromException(
+                       new String[]{"--algorithm", "TriangleListing"},
+                       expected,
+                       ProgramParametrizationException.class);
+       }
+
+       @Test
+       public void testDirectedHashWithRMatIntegerGraph() throws Exception {
+               String expected = "\n" +
+                       "ChecksumHashCode 0x0000001beffe6edd, count 75049\n" +
+                       "Triadic census:\n" +
+                       "  003: 113,435,893\n" +
+                       "  012: 6,632,528\n" +
+                       "  102: 983,535\n" +
+                       "  021d: 118,574\n" +
+                       "  021u: 118,566\n" +
+                       "  021c: 237,767\n" +
+                       "  111d: 129,773\n" +
+                       "  111u: 130,041\n" +
+                       "  030t: 16,981\n" +
+                       "  030c: 5,535\n" +
+                       "  201: 43,574\n" +
+                       "  120d: 7,449\n" +
+                       "  120u: 7,587\n" +
+                       "  120c: 15,178\n" +
+                       "  210: 17,368\n" +
+                       "  300: 4,951\n";
+
+               expectedOutput(
+                       new String[]{"--algorithm", "TriangleListing", 
"--order", "directed", "--sort_triangle_vertices", "--triadic_census",
+                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "directed",
+                               "--output", "hash"},
+                       expected);
+       }
+
+       @Test
+       public void testDirectedPrintWithRMatIntegerGraph() throws Exception {
+               expectedCount(
+                       new String[]{"--algorithm", "TriangleListing", 
"--order", "directed",
+                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "directed",
+                               "--output", "print"},
+                       75049);
+       }
+
+       @Test
+       public void testUndirectedHashWithRMatIntegerGraph() throws Exception {
+               String expected = "\n" +
+                       "ChecksumHashCode 0x00000000e6b3f32c, count 75049\n" +
+                       "Triadic census:\n" +
+                       "  03: 113,435,893\n" +
+                       "  12: 7,616,063\n" +
+                       "  21: 778,295\n" +
+                       "  30: 75,049\n";
+
+               expectedOutput(
+                       new String[]{"--algorithm", "TriangleListing", 
"--order", "undirected", "--sort_triangle_vertices", "--triadic_census",
+                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "undirected",
+                               "--output", "hash"},
+                       expected);
+       }
+
+       @Test
+       public void testUndirectedPrintWithRMatIntegerGraph() throws Exception {
+               expectedCount(
+                       new String[]{"--algorithm", "TriangleListing", 
"--order", "undirected",
+                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "undirected",
+                               "--output", "print"},
+                       75049);
+       }
+}

Reply via email to