http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
index 93a96c4..ca0c167 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
@@ -18,315 +18,125 @@
 
 package org.apache.flink.graph.drivers;
 
-import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAnalytic;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import 
org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Hash;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.BooleanParameter;
+import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.types.CopyableValue;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
- * Driver for the library implementation of Triangle Listing.
- *
- * This example reads a simple directed or undirected graph from a CSV file or
- * generates an RMat graph with the given scale and edge factor then lists
- * all triangles.
+ * Driver for directed and undirected triangle listing algorithm and analytic.
  *
  * @see org.apache.flink.graph.library.clustering.directed.TriangleListing
+ * @see org.apache.flink.graph.library.clustering.directed.TriadicCensus
  * @see org.apache.flink.graph.library.clustering.undirected.TriangleListing
+ * @see org.apache.flink.graph.library.clustering.undirected.TriadicCensus
  */
-public class TriangleListing {
-
-       private static final int DEFAULT_SCALE = 10;
-
-       private static final int DEFAULT_EDGE_FACTOR = 16;
+public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, 
EV>
+extends SimpleDriver<PrintableResult>
+implements Driver<K, VV, EV>, CSV, Hash, Print {
 
-       private static final boolean DEFAULT_TRIADIC_CENSUS = true;
-
-       private static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
-       private static String getUsage(String message) {
-               return new StrBuilder()
-                       .appendNewLine()
-                       .appendln(WordUtils.wrap("Lists all triangles in a 
graph.", 80))
-                       .appendNewLine()
-                       .appendln(WordUtils.wrap("This algorithm returns tuples 
containing the vertex IDs for each triangle and" +
-                               " for directed graphs a bitmask indicating the 
presence of the six potential connecting edges.", 80))
-                       .appendNewLine()
-                       .appendln("usage: TriangleListing --directed <true | 
false> [--triadic_census <true | false>] --input <csv | rmat> --output <print | 
hash | csv>")
-                       .appendNewLine()
-                       .appendln("options:")
-                       .appendln("  --input csv --type <integer | string> 
[--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter 
LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
-                       .appendln("  --input rmat [--scale SCALE] 
[--edge_factor EDGE_FACTOR]")
-                       .appendNewLine()
-                       .appendln("  --output print")
-                       .appendln("  --output hash")
-                       .appendln("  --output csv --output_filename FILENAME 
[--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter 
FIELD_DELIMITER]")
-                       .appendNewLine()
-                       .appendln("Usage error: " + message)
-                       .toString();
-       }
+       private static final String DIRECTED = "directed";
 
-       public static void main(String[] args) throws Exception {
-               // Set up the execution environment
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().enableObjectReuse();
+       private static final String UNDIRECTED = "undirected";
 
-               ParameterTool parameters = ParameterTool.fromArgs(args);
-               env.getConfig().setGlobalJobParameters(parameters);
+       private ChoiceParameter order = new ChoiceParameter(this, "order")
+               .addChoices(DIRECTED, UNDIRECTED);
 
-               if (! parameters.has("directed")) {
-                       throw new 
ProgramParametrizationException(getUsage("must declare execution mode as 
'--directed true' or '--directed false'"));
-               }
-               boolean directedAlgorithm = parameters.getBoolean("directed");
+       private BooleanParameter sortTriangleVertices = new 
BooleanParameter(this, "sort_triangle_vertices");
 
-               int little_parallelism = 
parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
-               boolean triadic_census = 
parameters.getBoolean("triadic_census", DEFAULT_TRIADIC_CENSUS);
+       private BooleanParameter computeTriadicCensus = new 
BooleanParameter(this, "triadic_census");
 
-               GraphAnalytic tc = null;
-               DataSet tl;
-
-               switch (parameters.get("input", "")) {
-                       case "csv": {
-                               String lineDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("input_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-                               String fieldDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("input_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-                               GraphCsvReader reader = Graph
-                                       
.fromCsvReader(parameters.getRequired("input_filename"), env)
-                                               .ignoreCommentsEdges("#")
-                                               
.lineDelimiterEdges(lineDelimiter)
-                                               
.fieldDelimiterEdges(fieldDelimiter);
-
-                               switch (parameters.get("type", "")) {
-                                       case "integer": {
-                                               Graph<LongValue, NullValue, 
NullValue> graph = reader
-                                                       
.keyType(LongValue.class);
-
-                                               if (directedAlgorithm) {
-                                                       if 
(parameters.getBoolean("simplify", false)) {
-                                                               graph = graph
-                                                                       
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, 
NullValue, NullValue>()
-                                                                               
.setParallelism(little_parallelism));
-                                                       }
-
-                                                       if (triadic_census) {
-                                                               tc = graph
-                                                                       
.run(new 
org.apache.flink.graph.library.clustering.directed.TriadicCensus<LongValue, 
NullValue, NullValue>()
-                                                                               
.setLittleParallelism(little_parallelism));
-                                                       }
-                                                       tl = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, 
NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                               } else {
-                                                       if 
(parameters.getBoolean("simplify", false)) {
-                                                               graph = graph
-                                                                       
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, 
NullValue, NullValue>(false)
-                                                                               
.setParallelism(little_parallelism));
-                                                       }
-
-                                                       if (triadic_census) {
-                                                               tc = graph
-                                                                       
.run(new 
org.apache.flink.graph.library.clustering.undirected.TriadicCensus<LongValue, 
NullValue, NullValue>()
-                                                                               
.setLittleParallelism(little_parallelism));
-                                                       }
-                                                       tl = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, 
NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                               }
-                                       } break;
-
-                                       case "string": {
-                                               Graph<StringValue, NullValue, 
NullValue> graph = reader
-                                                       
.keyType(StringValue.class);
-
-                                               if (directedAlgorithm) {
-                                                       if 
(parameters.getBoolean("simplify", false)) {
-                                                               graph = graph
-                                                                       
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, 
NullValue, NullValue>()
-                                                                               
.setParallelism(little_parallelism));
-                                                       }
-
-                                                       if (triadic_census) {
-                                                               tc = graph
-                                                                       
.run(new 
org.apache.flink.graph.library.clustering.directed.TriadicCensus<StringValue, 
NullValue, NullValue>()
-                                                                               
.setLittleParallelism(little_parallelism));
-                                                       }
-                                                       tl = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<StringValue, 
NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                               } else {
-                                                       if 
(parameters.getBoolean("simplify", false)) {
-                                                               graph = graph
-                                                                       
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, 
NullValue, NullValue>(false)
-                                                                               
.setParallelism(little_parallelism));
-                                                       }
-
-                                                       if (triadic_census) {
-                                                               tc = graph
-                                                                       
.run(new 
org.apache.flink.graph.library.clustering.undirected.TriadicCensus<StringValue, 
NullValue, NullValue>()
-                                                                               
.setLittleParallelism(little_parallelism));
-                                                       }
-                                                       tl = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<StringValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                               }
-                                       } break;
-
-                                       default:
-                                               throw new 
ProgramParametrizationException(getUsage("invalid CSV type"));
-                               }
+       private LongParameter littleParallelism = new LongParameter(this, 
"little_parallelism")
+               .setDefaultValue(PARALLELISM_DEFAULT);
 
+       private GraphAnalytic<K, VV, EV, ? extends PrintableResult> 
triadicCensus;
 
-                       } break;
-
-                       case "rmat": {
-                               int scale = parameters.getInt("scale", 
DEFAULT_SCALE);
-                               int edgeFactor = 
parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-                               RandomGenerableFactory<JDKRandomGenerator> rnd 
= new JDKRandomGeneratorFactory();
-
-                               long vertexCount = 1L << scale;
-                               long edgeCount = vertexCount * edgeFactor;
-
-                               Graph<LongValue, NullValue, NullValue> graph = 
new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-                                       .generate();
-
-                               if (directedAlgorithm) {
-                                       if (scale > 32) {
-                                               Graph<LongValue, NullValue, 
NullValue> simpleGraph = graph
-                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, 
NullValue>()
-                                                               
.setParallelism(little_parallelism));
-
-                                               if (triadic_census) {
-                                                       tc = simpleGraph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.TriadicCensus<LongValue, 
NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                               }
-                                               tl = simpleGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, 
NullValue, NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                       } else {
-                                               Graph<LongValue, NullValue, 
NullValue> simpleGraph = graph
-                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, 
NullValue>()
-                                                               
.setParallelism(little_parallelism));
-
-                                               if (triadic_census) {
-                                                       tc = simpleGraph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.TriadicCensus<LongValue, 
NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                               }
-                                               tl = simpleGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, 
NullValue, NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                       }
-                               } else {
-                                       boolean clipAndFlip = 
parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+       @Override
+       public String getName() {
+               return this.getClass().getSimpleName();
+       }
 
-                                       if (scale > 32) {
-                                               Graph<LongValue, NullValue, 
NullValue> simpleGraph = graph
-                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, 
NullValue>(clipAndFlip)
-                                                               
.setParallelism(little_parallelism));
+       @Override
+       public String getShortDescription() {
+               return "list triangles";
+       }
 
-                                               if (triadic_census) {
-                                                       tc = simpleGraph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.TriadicCensus<LongValue, 
NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                               }
-                                               tl = simpleGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, 
NullValue, NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                       } else {
-                                               Graph<IntValue, NullValue, 
NullValue> simpleGraph = graph
-                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue())
-                                                               
.setParallelism(little_parallelism))
-                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, 
NullValue>(clipAndFlip)
-                                                               
.setParallelism(little_parallelism));
+       @Override
+       public String getLongDescription() {
+               return WordUtils.wrap(new StrBuilder()
+                       .appendln("List all triangles graph.")
+                       .appendNewLine()
+                       .append("The algorithm result contains three vertex 
IDs. For the directed algorithm " +
+                               "the result contains an additional bitmask 
indicating the presence of the six " +
+                               "potential connecting edges.")
+                       .toString(), 80);
+       }
 
-                                               if (triadic_census) {
-                                                       tc = simpleGraph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.TriadicCensus<IntValue, 
NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                               }
-                                               tl = simpleGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, 
NullValue, NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                       }
+       @Override
+       public void plan(Graph<K, VV, EV> graph) throws Exception {
+               int lp = littleParallelism.getValue().intValue();
+
+               switch (order.getValue()) {
+                       case DIRECTED:
+                               result = graph
+                                       .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<K, VV, EV>()
+                                               
.setSortTriangleVertices(sortTriangleVertices.getValue())
+                                               .setLittleParallelism(lp));
+
+                               if (computeTriadicCensus.getValue()) {
+                                       triadicCensus = graph
+                                               .run(new 
org.apache.flink.graph.library.clustering.directed.TriadicCensus<K, VV, EV>()
+                                                       
.setLittleParallelism(lp));
                                }
-                       } break;
+                               break;
 
-                       default:
-                               throw new 
ProgramParametrizationException(getUsage("invalid input type"));
-               }
+                       case UNDIRECTED:
+                               result = graph
+                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<K, VV, 
EV>()
+                                               
.setSortTriangleVertices(sortTriangleVertices.getValue())
+                                               .setLittleParallelism(lp));
 
-               switch (parameters.get("output", "")) {
-                       case "print":
-                               System.out.println();
-                               if (directedAlgorithm) {
-                                       for (Object e: tl.collect()) {
-                                               
org.apache.flink.graph.library.clustering.directed.TriangleListing.Result 
result =
-                                                       
(org.apache.flink.graph.library.clustering.directed.TriangleListing.Result) e;
-                                               
System.out.println(result.toPrintableString());
-                                       }
-                               } else {
-                                       tl.print();
+                               if (computeTriadicCensus.getValue()) {
+                                       triadicCensus = graph
+                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.TriadicCensus<K, VV, EV>()
+                                                       
.setLittleParallelism(lp));
                                }
                                break;
+               }
+       }
 
-                       case "hash":
-                               System.out.println();
-                               
System.out.println(DataSetUtils.checksumHashCode(tl));
-                               break;
-
-                       case "csv":
-                               String filename = 
parameters.getRequired("output_filename");
-
-                               String lineDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("output_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-                               String fieldDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       
parameters.get("output_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+       @Override
+       public void hash(String executionName) throws Exception {
+               super.hash(executionName);
+               printAnalytics();
+       }
 
-                               tl.writeAsCsv(filename, lineDelimiter, 
fieldDelimiter);
+       @Override
+       public void print(String executionName) throws Exception {
+               super.print(executionName);
+               printAnalytics();
+       }
 
-                               env.execute();
-                               break;
-                       default:
-                               throw new 
ProgramParametrizationException(getUsage("invalid output type"));
-               }
+       @Override
+       public void writeCSV(String filename, String lineDelimiter, String 
fieldDelimiter) {
+               super.writeCSV(filename, lineDelimiter, fieldDelimiter);
+               printAnalytics();
+       }
 
-               if (tc != null) {
+       private void printAnalytics() {
+               if (computeTriadicCensus.getValue()) {
                        System.out.print("Triadic census:\n  ");
-                       
System.out.println(tc.getResult().toString().replace(";", "\n "));
+                       
System.out.println(triadicCensus.getResult().toPrintableString().replace(";", 
"\n "));
                }
-
-               JobExecutionResult result = env.getLastJobExecutionResult();
-
-               NumberFormat nf = NumberFormat.getInstance();
-               System.out.println();
-               System.out.println("Execution runtime: " + 
nf.format(result.getNetRuntime()) + " ms");
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java
new file mode 100644
index 0000000..e9d648a
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.parameter;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.drivers.parameter.IterationConvergence.Value;
+
+/**
+ * Iterative algorithms which converge can be terminated with a maximum number
+ * of iterations or a convergence threshold which stops computation when the
+ * total change in scores is below a given delta.
+ *
+ * If the command-line configuration specifies neither a number of iterations
+ * nor a convergence threshold then a default number of iterations is used
+ * with an infinite convergence threshold. Otherwise, when either value is
+ * configured then an unset value is set to infinity.
+ */
+public class IterationConvergence
+implements Parameter<Value> {
+
+       private final int defaultIterations;
+
+       private final Value value = new Value();
+
+       /**
+        * Add this parameter to the list of parameters stored by owner.
+        *
+        * @param owner the {@link Parameterized} using this {@link Parameter}
+        * @param defaultIterations the default number of iterations if neither
+        *                          the number of iterations nor the convergence
+        *                          threshold are specified
+        */
+       public IterationConvergence(ParameterizedBase owner, int 
defaultIterations) {
+               owner.addParameter(this);
+               this.defaultIterations = defaultIterations;
+       }
+
+       @Override
+       public String getUsage() {
+               return "[--iterations ITERATIONS] [--convergence_threshold 
CONVERGENCE_THRESHOLD]";
+       }
+
+       @Override
+       public void configure(ParameterTool parameterTool) {
+               if (!parameterTool.has("iterations") && 
!parameterTool.has("convergence_threshold")) {
+                       // no configuration so use default iterations and 
maximum threshold
+                       value.iterations = defaultIterations;
+                       value.convergenceThreshold = Double.MAX_VALUE;
+               } else {
+                       // use configured values and maximum default for unset 
values
+                       value.iterations = parameterTool.getInt("iterations", 
Integer.MAX_VALUE);
+                       Util.checkParameter(value.iterations > 0,
+                               "iterations must be greater than zero");
+
+                       value.convergenceThreshold = 
parameterTool.getDouble("convergence_threshold", Double.MAX_VALUE);
+                       Util.checkParameter(value.convergenceThreshold > 0,
+                               "convergence threshold must be greater than 
zero");
+               }
+       }
+
+       @Override
+       public Value getValue() {
+               return value;
+       }
+
+       /**
+        * Encapsulate the number of iterations and the convergence threshold.
+        */
+       public static class Value {
+               public int iterations;
+               public double convergenceThreshold;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java
deleted file mode 100644
index 6651739..0000000
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData;
-import org.apache.flink.graph.library.GSAConnectedComponents;
-import org.apache.flink.types.NullValue;
-
-/**
- * This example shows how to use Gelly's library methods.
- * You can find all available library methods in {@link 
org.apache.flink.graph.library}. 
- * 
- * In particular, this example uses the {@link GSAConnectedComponents}
- * library method to compute the connected components of the input graph.
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId which are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\n1\t3\n</code> defines two edges,
- * 1-2 with and 1-3.
- *
- * Usage <code>ConnectedComponents &lt;edge path&gt; &lt;result path&gt;
- * &lt;number of iterations&gt; </code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link ConnectedComponentsDefaultData}
- */
-public class ConnectedComponents implements ProgramDescription {
-
-       @SuppressWarnings("serial")
-       public static void main(String [] args) throws Exception {
-
-               if(!parseParameters(args)) {
-                       return;
-               }
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env);
-
-               Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, 
new MapFunction<Long, Long>() {
-                       @Override
-                       public Long map(Long value) throws Exception {
-                               return value;
-                       }
-               }, env);
-
-               DataSet<Vertex<Long, Long>> verticesWithMinIds = graph
-                               .run(new GSAConnectedComponents<Long, Long, 
NullValue>(maxIterations));
-
-               // emit result
-               if (fileOutput) {
-                       verticesWithMinIds.writeAsCsv(outputPath, "\n", ",");
-
-                       // since file sinks are lazy, we trigger the execution 
explicitly
-                       env.execute("Connected Components Example");
-               } else {
-                       verticesWithMinIds.print();
-               }
-       }
-
-       @Override
-       public String getDescription() {
-               return "Connected Components Example";
-       }
-
-       // 
*************************************************************************
-       // UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String edgeInputPath = null;
-       private static String outputPath = null;
-       private static Integer maxIterations = 
ConnectedComponentsDefaultData.MAX_ITERATIONS;
-
-       private static boolean parseParameters(String [] args) {
-               if(args.length > 0) {
-                       if(args.length != 3) {
-                               System.err.println("Usage ConnectedComponents 
<edge path> <output path> " +
-                                               "<num iterations>");
-                               return false;
-                       }
-
-                       fileOutput = true;
-                       edgeInputPath = args[0];
-                       outputPath = args[1];
-                       maxIterations = Integer.parseInt(args[2]);
-
-               } else {
-                       System.out.println("Executing ConnectedComponents 
example with default parameters and built-in default data.");
-                       System.out.println("Provide parameters to read input 
data from files.");
-                       System.out.println("Usage ConnectedComponents <edge 
path> <output path> " +
-                                       "<num iterations>");
-               }
-
-               return true;
-       }
-
-       @SuppressWarnings("serial")
-       private static DataSet<Edge<Long, NullValue>> 
getEdgesDataSet(ExecutionEnvironment env) {
-
-               if(fileOutput) {
-                       return env.readCsvFile(edgeInputPath)
-                                       .ignoreComments("#")
-                                       .fieldDelimiter("\t")
-                                       .lineDelimiter("\n")
-                                       .types(Long.class, Long.class)
-                                       .map(new MapFunction<Tuple2<Long, 
Long>, Edge<Long, NullValue>>() {
-                                               @Override
-                                               public Edge<Long, NullValue> 
map(Tuple2<Long, Long> value) throws Exception {
-                                                       return new 
Edge<>(value.f0, value.f1, NullValue.getInstance());
-                                               }
-                                       });
-               } else {
-                       return 
ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
index 35f07b0..1cd3549 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
@@ -114,7 +114,7 @@ public class GSASingleSourceShortestPaths implements 
ProgramDescription {
                public Double gather(Neighbor<Double, Double> neighbor) {
                        return neighbor.getNeighborValue() + 
neighbor.getEdgeValue();
                }
-       };
+       }
 
        @SuppressWarnings("serial")
        private static final class ChooseMinDistance extends 
SumFunction<Double, Double, Double> {
@@ -122,7 +122,7 @@ public class GSASingleSourceShortestPaths implements 
ProgramDescription {
                public Double sum(Double newValue, Double currentValue) {
                        return Math.min(newValue, currentValue);
                }
-       };
+       }
 
        @SuppressWarnings("serial")
        private static final class UpdateDistance extends ApplyFunction<Long, 
Double, Double> {

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/IterationConvergenceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/IterationConvergenceTest.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/IterationConvergenceTest.java
new file mode 100644
index 0000000..ae92943
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/IterationConvergenceTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.parameter;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IterationConvergenceTest
+extends ParameterTestBase {
+
+       private IterationConvergence parameter;
+
+       @Before
+       public void setup() {
+               super.setup();
+
+               parameter = new IterationConvergence(owner, 10);
+       }
+
+       @Test
+       public void testWithIterations() {
+               parameter.configure(ParameterTool.fromArgs(new 
String[]{"--iterations", "42"}));
+               Assert.assertEquals(42, parameter.getValue().iterations);
+               Assert.assertEquals(Double.MAX_VALUE, 
parameter.getValue().convergenceThreshold, 0.000001);
+       }
+
+       @Test
+       public void testWithConvergenceThreshold() {
+               parameter.configure(ParameterTool.fromArgs(new 
String[]{"--convergence_threshold", "42"}));
+               Assert.assertEquals(Integer.MAX_VALUE, 
parameter.getValue().iterations);
+               Assert.assertEquals(42.0, 
parameter.getValue().convergenceThreshold, 0.000001);
+       }
+
+       @Test
+       public void testWithBoth() {
+               parameter.configure(ParameterTool.fromArgs(new 
String[]{"--iterations", "42", "--convergence_threshold", "42"}));
+               Assert.assertEquals(42, parameter.getValue().iterations);
+               Assert.assertEquals(42.0, 
parameter.getValue().convergenceThreshold, 0.000001);
+       }
+
+       @Test
+       public void testWithNeither() {
+               parameter.configure(ParameterTool.fromArgs(new String[]{}));
+               Assert.assertEquals(10, parameter.getValue().iterations);
+               Assert.assertEquals(Double.MAX_VALUE, 
parameter.getValue().convergenceThreshold, 0.000001);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java
deleted file mode 100644
index d0de8dc..0000000
--- 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.examples;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.examples.ConnectedComponents;
-import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class ConnectedComponentsITCase extends MultipleProgramsTestBase {
-
-       private String edgesPath;
-
-       private String resultPath;
-
-       private String expected;
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       public ConnectedComponentsITCase(TestExecutionMode mode) {
-               super(mode);
-       }
-
-       @Before
-       public void before() throws Exception {
-               resultPath = tempFolder.newFile().toURI().toString();
-
-               File edgesFile = tempFolder.newFile();
-               Files.write(ConnectedComponentsDefaultData.EDGES, edgesFile, 
Charsets.UTF_8);
-               edgesPath = edgesFile.toURI().toString();
-       }
-
-       @Test
-       public void testConnectedComponentsExample() throws Exception {
-               ConnectedComponents.main(new String[]{edgesPath, resultPath, 
ConnectedComponentsDefaultData.MAX_ITERATIONS + ""});
-               expected = ConnectedComponentsDefaultData.VERTICES_WITH_MIN_ID;
-       }
-
-       @After
-       public void after() throws Exception {
-               TestBaseUtils.compareResultsByLinesInMemory(expected, 
resultPath);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index cbbfb02..71baaa9 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -529,7 +529,13 @@ public class Graph<K, VV, EV> {
 
                TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
vertices.getType()).getTypeAt(0);
 
-               TypeInformation<NV> valueType = 
TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, 
vertices.getType(), null);
+               TypeInformation<NV> valueType;
+
+               if (mapper instanceof ResultTypeQueryable) {
+                       valueType = ((ResultTypeQueryable) 
mapper).getProducedType();
+               } else {
+                       valueType = 
TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, 
vertices.getType(), null);
+               }
 
                TypeInformation<Vertex<K, NV>> returnType = 
(TypeInformation<Vertex<K, NV>>) new TupleTypeInfo(
                                Vertex.class, keyType, valueType);
@@ -573,7 +579,13 @@ public class Graph<K, VV, EV> {
 
                TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
edges.getType()).getTypeAt(0);
 
-               TypeInformation<NV> valueType = 
TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, 
edges.getType(), null);
+               TypeInformation<NV> valueType;
+
+               if (mapper instanceof ResultTypeQueryable) {
+                       valueType = ((ResultTypeQueryable) 
mapper).getProducedType();
+               } else {
+                       valueType = 
TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, 
edges.getType(), null);
+               }
 
                TypeInformation<Edge<K, NV>> returnType = 
(TypeInformation<Edge<K, NV>>) new TupleTypeInfo(
                                Edge.class, keyType, keyType, valueType);

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
index 3cd8f05..959b816 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
@@ -22,13 +22,14 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.ScatterFunction;
-import org.apache.flink.graph.utils.NullValueEdgeMapper;
+import org.apache.flink.graph.utils.GraphUtils.MapTo;
 import org.apache.flink.types.NullValue;
 
 /**
@@ -72,7 +73,7 @@ public class ConnectedComponents<K, VV extends 
Comparable<VV>, EV>
                TypeInformation<VV> valueTypeInfo = ((TupleTypeInfo<?>) 
graph.getVertices().getType()).getTypeAt(1);
 
                Graph<K, VV, NullValue> undirectedGraph = graph
-                       .mapEdges(new NullValueEdgeMapper<K, EV>())
+                       .mapEdges(new MapTo<Edge<K, EV>, 
NullValue>(NullValue.getInstance()))
                        .getUndirected();
 
                return undirectedGraph.runScatterGatherIteration(

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
index 327de73..1680f38 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
@@ -29,7 +30,7 @@ import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.Neighbor;
 import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.utils.NullValueEdgeMapper;
+import org.apache.flink.graph.utils.GraphUtils.MapTo;
 import org.apache.flink.types.NullValue;
 
 /**
@@ -73,7 +74,7 @@ public class GSAConnectedComponents<K, VV extends 
Comparable<VV>, EV>
                TypeInformation<VV> valueTypeInfo = ((TupleTypeInfo<?>) 
graph.getVertices().getType()).getTypeAt(1);
 
                Graph<K, VV, NullValue> undirectedGraph = graph
-                       .mapEdges(new NullValueEdgeMapper<K, EV>())
+                       .mapEdges(new MapTo<Edge<K, EV>, 
NullValue>(NullValue.getInstance()))
                        .getUndirected();
 
                return undirectedGraph.runGatherSumApplyIteration(
@@ -87,7 +88,6 @@ public class GSAConnectedComponents<K, VV extends 
Comparable<VV>, EV>
        //  Connected Components UDFs
        // 
--------------------------------------------------------------------------------------------
 
-       @SuppressWarnings("serial")
        private static final class GatherNeighborIds<VV extends Comparable<VV>>
                extends GatherFunction<VV, NullValue, VV>
                implements ResultTypeQueryable<VV> {
@@ -108,7 +108,6 @@ public class GSAConnectedComponents<K, VV extends 
Comparable<VV>, EV>
                }
        }
 
-       @SuppressWarnings("serial")
        private static final class SelectMinId<VV extends Comparable<VV>>
                extends SumFunction<VV, NullValue, VV>
                implements ResultTypeQueryable<VV> {
@@ -129,7 +128,6 @@ public class GSAConnectedComponents<K, VV extends 
Comparable<VV>, EV>
                }
        }
 
-       @SuppressWarnings("serial")
        private static final class UpdateComponentId<K, VV extends 
Comparable<VV>>
                extends ApplyFunction<K, VV, VV>
                implements ResultTypeQueryable<VV> {

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
index 96e5afc..0064a68 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -22,13 +22,14 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.ScatterFunction;
-import org.apache.flink.graph.utils.NullValueEdgeMapper;
+import org.apache.flink.graph.utils.GraphUtils.MapTo;
 import org.apache.flink.types.NullValue;
 
 import java.util.HashMap;
@@ -76,7 +77,7 @@ public class LabelPropagation<K, VV extends Comparable<VV>, 
EV>
                TypeInformation<VV> valueType = ((TupleTypeInfo<?>) 
input.getVertices().getType()).getTypeAt(1);
                // iteratively adopt the most frequent label among the 
neighbors of each vertex
                return input
-                       .mapEdges(new NullValueEdgeMapper<K, EV>())
+                       .mapEdges(new MapTo<Edge<K, EV>, 
NullValue>(NullValue.getInstance()))
                        .runScatterGatherIteration(
                                new SendNewLabelToNeighbors<K, VV>(valueType), 
new UpdateVertexLabel<K, VV>(), maxIterations)
                        .getVertices();

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
index 236272f..6fe753a 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -35,9 +35,9 @@ import org.apache.flink.graph.EdgeOrder;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair;
 import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
-import 
org.apache.flink.graph.library.clustering.directed.TriangleListing.Result;
 import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.asm.result.TertiaryResult;
+import 
org.apache.flink.graph.library.clustering.directed.TriangleListing.Result;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.ByteValue;

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index 9aca8a4..eda5c1c 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -32,8 +32,8 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
 import org.apache.flink.graph.asm.result.PrintableResult;
-import 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
 import org.apache.flink.graph.asm.result.UnaryResult;
+import 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
index 57743e8..1dfa3ee 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
@@ -40,6 +40,8 @@ import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.directed.EdgeSourceDegrees;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
 import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.asm.result.UnaryResult;
 import org.apache.flink.graph.library.link_analysis.Functions.SumScore;
 import org.apache.flink.graph.library.link_analysis.PageRank.Result;
 import org.apache.flink.graph.utils.GraphUtils;
@@ -500,7 +502,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
         * @param <T> ID type
         */
        public static class Result<T>
-       extends Tuple2<T, DoubleValue> {
+       extends Tuple2<T, DoubleValue>
+       implements PrintableResult, UnaryResult<T> {
                public static final int HASH_SEED = 0x4010af29;
 
                private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
@@ -518,7 +521,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                        return f1;
                }
 
-               public String toVerboseString() {
+               @Override
+               public String toPrintableString() {
                        return "Vertex ID: " + getVertexId0()
                                + ", PageRank score: " + getPageRankScore();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
index 7d77541..6aaf9f2 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
@@ -34,8 +34,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
-import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.asm.result.BinaryResult;
+import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.library.similarity.AdamicAdar.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
index 3b36715..0c80e6d 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
@@ -29,8 +29,8 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
-import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.asm.result.BinaryResult;
+import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
index 2e0dffc..78fb378 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
@@ -20,7 +20,10 @@ package org.apache.flink.graph.utils;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.types.LongValue;
 
 import static 
org.apache.flink.api.java.typeutils.ValueTypeInfo.LONG_VALUE_TYPE_INFO;
@@ -62,7 +65,7 @@ public class GraphUtils {
         * @param <O> output type
         */
        public static class MapTo<I, O>
-       implements MapFunction<I, O> {
+       implements MapFunction<I, O>, ResultTypeQueryable<O> {
                private final O value;
 
                /**
@@ -78,6 +81,11 @@ public class GraphUtils {
                public O map(I o) throws Exception {
                        return value;
                }
+
+               @Override
+               public TypeInformation<O> getProducedType() {
+                       return 
(TypeInformation<O>)TypeExtractor.createTypeInfo(value.getClass());
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
deleted file mode 100644
index 2bd4719..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.utils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.types.NullValue;
-
-public class NullValueEdgeMapper<K, EV> implements     MapFunction<Edge<K, 
EV>, NullValue> {
-
-       private static final long serialVersionUID = 1L;
-
-       public NullValue map(Edge<K, EV> edge) {
-               return NullValue.getInstance();
-       }
-}

Reply via email to