Repository: flink
Updated Branches:
  refs/heads/master c4783c856 -> c4f9f0d78


http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
deleted file mode 100644
index 615d765..0000000
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples;
-
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAnalytic;
-import org.apache.flink.graph.GraphCsvReader;
-import 
org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
-/**
- * Driver for the library implementations of Global and Local Clustering 
Coefficient.
- *
- * This example reads a simple directed or undirected graph from a CSV file or
- * generates an RMat graph with the given scale and edge factor then calculates
- * the local clustering coefficient for each vertex and the global clustering
- * coefficient for the graph.
- *
- * @see 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient
- * @see 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient
- * @see 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient
- * @see 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient
- */
-public class ClusteringCoefficient {
-
-       public static final int DEFAULT_SCALE = 10;
-
-       public static final int DEFAULT_EDGE_FACTOR = 16;
-
-       public static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
-       private static void printUsage() {
-               System.out.println(WordUtils.wrap("The local clustering 
coefficient measures the connectedness of each" +
-                       " vertex's neighborhood and the global clustering 
coefficient measures the connectedness of the graph." +
-                       " Scores range from 0.0 (no edges between neighbors or 
vertices) to 1.0 (neighborhood or graph" +
-                       " is a clique).", 80));
-               System.out.println();
-               System.out.println(WordUtils.wrap("This algorithm returns 
tuples containing the vertex ID, the degree of" +
-                       " the vertex, and the number of edges between vertex 
neighbors.", 80));
-               System.out.println();
-               System.out.println("usage: ClusteringCoefficient --directed 
<true | false> --input <csv | rmat [options]> --output <print | hash | csv 
[options]>");
-               System.out.println();
-               System.out.println("options:");
-               System.out.println("  --input csv --type <integer | string> 
--input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] 
[--input_field_delimiter FIELD_DELIMITER]");
-               System.out.println("  --input rmat [--scale SCALE] 
[--edge_factor EDGE_FACTOR]");
-               System.out.println();
-               System.out.println("  --output print");
-               System.out.println("  --output hash");
-               System.out.println("  --output csv --output_filename FILENAME 
[--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter 
FIELD_DELIMITER]");
-       }
-
-       public static void main(String[] args) throws Exception {
-               // Set up the execution environment
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().enableObjectReuse();
-
-               ParameterTool parameters = ParameterTool.fromArgs(args);
-
-               if (! parameters.has("directed")) {
-                       printUsage();
-                       return;
-               }
-               boolean directedAlgorithm = parameters.getBoolean("directed");
-
-               int little_parallelism = 
parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
-
-               // global and local clustering coefficient results
-               GraphAnalytic gcc;
-               GraphAnalytic acc;
-               DataSet lcc;
-
-               switch (parameters.get("input", "")) {
-                       case "csv": {
-                               String lineDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("input_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-                               String fieldDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("input_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-                               GraphCsvReader reader = Graph
-                                       
.fromCsvReader(parameters.get("input_filename"), env)
-                                               .ignoreCommentsEdges("#")
-                                               
.lineDelimiterEdges(lineDelimiter)
-                                               
.fieldDelimiterEdges(fieldDelimiter);
-
-                               switch (parameters.get("type", "")) {
-                                       case "integer": {
-                                               Graph<LongValue, NullValue, 
NullValue> graph = reader
-                                                       
.keyType(LongValue.class);
-
-                                               if (directedAlgorithm) {
-                                                       gcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                                       acc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                                       lcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                               } else {
-                                                       gcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                                       acc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                                       lcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                               }
-                                       } break;
-
-                                       case "string": {
-                                               Graph<StringValue, NullValue, 
NullValue> graph = reader
-                                                       
.keyType(StringValue.class);
-
-                                               if (directedAlgorithm) {
-                                                       gcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<StringValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                                       acc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<StringValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                                       lcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<StringValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                               } else {
-                                                       gcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<StringValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                                       acc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<StringValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                                       lcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<StringValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                               }
-                                       } break;
-
-                                       default:
-                                               printUsage();
-                                               return;
-                               }
-                       } break;
-
-                       case "rmat": {
-                               int scale = parameters.getInt("scale", 
DEFAULT_SCALE);
-                               int edgeFactor = 
parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-                               RandomGenerableFactory<JDKRandomGenerator> rnd 
= new JDKRandomGeneratorFactory();
-
-                               long vertexCount = 1L << scale;
-                               long edgeCount = vertexCount * edgeFactor;
-
-                               Graph<LongValue, NullValue, NullValue> graph = 
new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-                                       .setParallelism(little_parallelism)
-                                       .generate();
-
-                               if (directedAlgorithm) {
-                                       if (scale > 32) {
-                                               Graph<LongValue, NullValue, 
NullValue> newGraph = graph
-                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, 
NullValue>()
-                                                               
.setParallelism(little_parallelism));
-
-                                               gcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                               acc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                               lcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                               
.setIncludeZeroDegreeVertices(false)
-                                                               
.setLittleParallelism(little_parallelism));
-                                       } else {
-                                               Graph<IntValue, NullValue, 
NullValue> newGraph = graph
-                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue())
-                                                               
.setParallelism(little_parallelism))
-                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, 
NullValue>()
-                                                               
.setParallelism(little_parallelism));
-
-                                               gcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue,
 NullValue, NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                               acc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<IntValue,
 NullValue, NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                               lcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue,
 NullValue, NullValue>()
-                                                               
.setIncludeZeroDegreeVertices(false)
-                                                               
.setLittleParallelism(little_parallelism));
-                                       }
-                               } else {
-                                       boolean clipAndFlip = 
parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
-
-                                       if (scale > 32) {
-                                               Graph<LongValue, NullValue, 
NullValue> newGraph = graph
-                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, 
NullValue>(clipAndFlip)
-                                                               
.setParallelism(little_parallelism));
-
-                                               gcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                               acc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                               lcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                               
.setIncludeZeroDegreeVertices(false)
-                                                               
.setLittleParallelism(little_parallelism));
-                                       } else {
-                                               Graph<IntValue, NullValue, 
NullValue> newGraph = graph
-                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue())
-                                                               
.setParallelism(little_parallelism))
-                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, 
NullValue>(clipAndFlip)
-                                                               
.setParallelism(little_parallelism));
-
-                                               gcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue,
 NullValue, NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                               acc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<IntValue,
 NullValue, NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                               lcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue,
 NullValue, NullValue>()
-                                                               
.setIncludeZeroDegreeVertices(false)
-                                                               
.setLittleParallelism(little_parallelism));
-                                       }
-                               }
-                       } break;
-
-                       default:
-                               printUsage();
-                               return;
-               }
-
-               switch (parameters.get("output", "")) {
-                       case "print":
-                               if (directedAlgorithm) {
-                                       for (Object e: lcc.collect()) {
-                                               
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result
 result =
-                                                       
(org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result)e;
-                                               
System.out.println(result.toVerboseString());
-                                       }
-                               } else {
-                                       for (Object e: lcc.collect()) {
-                                               
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result
 result =
-                                                       
(org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result)e;
-                                               
System.out.println(result.toVerboseString());
-                                       }
-                               }
-                               System.out.println(gcc.getResult());
-                               System.out.println(acc.getResult());
-                               break;
-
-                       case "hash":
-                               
System.out.println(DataSetUtils.checksumHashCode(lcc));
-                               System.out.println(gcc.getResult());
-                               System.out.println(acc.getResult());
-                               break;
-
-                       case "csv":
-                               String filename = 
parameters.get("output_filename");
-
-                               String lineDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("output_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-                               String fieldDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       
parameters.get("output_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-                               lcc.writeAsCsv(filename, lineDelimiter, 
fieldDelimiter);
-
-                               env.execute("Clustering Coefficient");
-
-                               System.out.println(gcc.getResult());
-                               System.out.println(acc.getResult());
-                               break;
-
-                       default:
-                               printUsage();
-                               return;
-               }
-
-               JobExecutionResult result = env.getLastJobExecutionResult();
-
-               NumberFormat nf = NumberFormat.getInstance();
-               System.out.println("Execution runtime: " + 
nf.format(result.getNetRuntime()) + " ms");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
deleted file mode 100644
index 73bba2c..0000000
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples;
-
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.asm.simple.undirected.Simplify;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-
-import java.text.NumberFormat;
-
-/**
- * Generate an RMat graph for Graph 500.
- *
- * Note that this does not yet implement permutation of vertex labels or edges.
- *
- * @see <a href="http://www.graph500.org/specifications";>Graph 500</a>
- */
-public class Graph500 {
-
-       public static final int DEFAULT_SCALE = 10;
-
-       public static final int DEFAULT_EDGE_FACTOR = 16;
-
-       public static final boolean DEFAULT_SIMPLIFY = false;
-
-       public static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
-       public static void main(String[] args) throws Exception {
-               // Set up the execution environment
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().enableObjectReuse();
-
-               ParameterTool parameters = ParameterTool.fromArgs(args);
-
-               // Generate RMat graph
-               int scale = parameters.getInt("scale", DEFAULT_SCALE);
-               int edgeFactor = parameters.getInt("edge_factor", 
DEFAULT_EDGE_FACTOR);
-
-               RandomGenerableFactory<JDKRandomGenerator> rnd = new 
JDKRandomGeneratorFactory();
-
-               long vertexCount = 1L << scale;
-               long edgeCount = vertexCount * edgeFactor;
-
-               boolean simplify = parameters.getBoolean("simplify", 
DEFAULT_SIMPLIFY);
-               boolean clipAndFlip = parameters.getBoolean("clip_and_flip", 
DEFAULT_CLIP_AND_FLIP);
-
-               Graph<LongValue, NullValue, NullValue> graph = new 
RMatGraph<>(env, rnd, vertexCount, edgeCount)
-                       .generate();
-
-               if (simplify) {
-                       graph = graph.run(new Simplify<LongValue, NullValue, 
NullValue>(clipAndFlip));
-               }
-
-               DataSet<Tuple2<LongValue,LongValue>> edges = graph
-                       .getEdges()
-                       .project(0, 1);
-
-               // Print, hash, or write RMat graph to disk
-               switch (parameters.get("output", "")) {
-               case "print":
-                       edges.print();
-                       break;
-
-               case "hash":
-                       
System.out.println(DataSetUtils.checksumHashCode(edges));
-                       break;
-
-               case "csv":
-                       String filename = parameters.get("filename");
-
-                       String row_delimiter = parameters.get("row_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER);
-                       String field_delimiter = 
parameters.get("field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
-
-                       edges.writeAsCsv(filename, row_delimiter, 
field_delimiter);
-
-                       env.execute();
-                       break;
-               default:
-                       System.out.println("A Graph500 generator using the 
Recursive Matrix (RMat) graph generator.");
-                       System.out.println();
-                       System.out.println("The graph matrix contains 2^scale 
vertices although not every vertex will");
-                       System.out.println("be represented in an edge. The 
number of edges is edge_factor * 2^scale edges");
-                       System.out.println("although some edges may be 
duplicates.");
-                       System.out.println();
-                       System.out.println("Note: this does not yet implement 
permutation of vertex labels or edges.");
-                       System.out.println();
-                       System.out.println("usage:");
-                       System.out.println("  Graph500 [--scale SCALE] 
[--edge_factor EDGE_FACTOR] --output print");
-                       System.out.println("  Graph500 [--scale SCALE] 
[--edge_factor EDGE_FACTOR] --output hash");
-                       System.out.println("  Graph500 [--scale SCALE] 
[--edge_factor EDGE_FACTOR] --output csv" +
-                                       " --filename FILENAME [--row_delimiter 
ROW_DELIMITER] [--field_delimiter FIELD_DELIMITER]");
-
-                       return;
-               }
-
-               JobExecutionResult result = env.getLastJobExecutionResult();
-
-               NumberFormat nf = NumberFormat.getInstance();
-               System.out.println("Execution runtime: " + 
nf.format(result.getNetRuntime()) + " ms");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java
deleted file mode 100644
index e7b47bf..0000000
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.examples.utils.ExampleUtils;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-
-/**
- * This example illustrates how to use Gelly metrics methods and get simple 
statistics
- * from the input graph.  
- * 
- * The program creates a random graph and computes and prints
- * the following metrics:
- * - number of vertices
- * - number of edges
- * - average node degree
- * - the vertex ids with the max/min in- and out-degrees
- *
- * The input file is expected to contain one edge per line,
- * with long IDs and no values, in the following format:
- * "&lt;sourceVertexID&gt;\t&lt;targetVertexID&gt;".
- * If no arguments are provided, the example runs with a random graph of 100 
vertices.
- *
- */
-public class GraphMetrics implements ProgramDescription {
-
-       public static void main(String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               /** create the graph **/
-               Graph<Long, NullValue, NullValue> graph = 
Graph.fromDataSet(getEdgesDataSet(env), env);
-               
-               /** get the number of vertices **/
-               long numVertices = graph.numberOfVertices();
-               
-               /** get the number of edges **/
-               long numEdges = graph.numberOfEdges();
-               
-               /** compute the average node degree **/
-               DataSet<Tuple2<Long, LongValue>> verticesWithDegrees = 
graph.getDegrees();
-
-               DataSet<Double> avgNodeDegree = verticesWithDegrees
-                               .aggregate(Aggregations.SUM, 1).map(new 
AvgNodeDegreeMapper(numVertices));
-               
-               /** find the vertex with the maximum in-degree **/
-               DataSet<Long> maxInDegreeVertex = 
graph.inDegrees().maxBy(1).map(new ProjectVertexId());
-
-               /** find the vertex with the minimum in-degree **/
-               DataSet<Long> minInDegreeVertex = 
graph.inDegrees().minBy(1).map(new ProjectVertexId());
-
-               /** find the vertex with the maximum out-degree **/
-               DataSet<Long> maxOutDegreeVertex = 
graph.outDegrees().maxBy(1).map(new ProjectVertexId());
-
-               /** find the vertex with the minimum out-degree **/
-               DataSet<Long> minOutDegreeVertex = 
graph.outDegrees().minBy(1).map(new ProjectVertexId());
-               
-               /** print the results **/
-               ExampleUtils.printResult(env.fromElements(numVertices), "Total 
number of vertices");
-               ExampleUtils.printResult(env.fromElements(numEdges), "Total 
number of edges");
-               ExampleUtils.printResult(avgNodeDegree, "Average node degree");
-               ExampleUtils.printResult(maxInDegreeVertex, "Vertex with Max 
in-degree");
-               ExampleUtils.printResult(minInDegreeVertex, "Vertex with Min 
in-degree");
-               ExampleUtils.printResult(maxOutDegreeVertex, "Vertex with Max 
out-degree");
-               ExampleUtils.printResult(minOutDegreeVertex, "Vertex with Min 
out-degree");
-
-               env.execute();
-       }
-
-       @SuppressWarnings("serial")
-       private static final class AvgNodeDegreeMapper implements 
MapFunction<Tuple2<Long, LongValue>, Double> {
-
-               private long numberOfVertices;
-
-               public AvgNodeDegreeMapper(long numberOfVertices) {
-                       this.numberOfVertices = numberOfVertices;
-               }
-
-               public Double map(Tuple2<Long, LongValue> sumTuple) {
-                       return (double) (sumTuple.f1.getValue() / 
numberOfVertices) ;
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class ProjectVertexId implements 
MapFunction<Tuple2<Long, LongValue>, Long> {
-               public Long map(Tuple2<Long, LongValue> value) { return 
value.f0; }
-       }
-
-       @Override
-       public String getDescription() {
-               return "Graph Metrics Example";
-       }
-
-       // 
******************************************************************************************************************
-       // UTIL METHODS
-       // 
******************************************************************************************************************
-
-       private static boolean fileOutput = false;
-
-       private static String edgesInputPath = null;
-
-       static final int NUM_VERTICES = 100;
-
-       static final long SEED = 9876;
-
-       private static boolean parseParameters(String[] args) {
-
-               if(args.length > 0) {
-                       if(args.length != 1) {
-                               System.err.println("Usage: GraphMetrics <input 
edges>");
-                               return false;
-                       }
-
-                       fileOutput = true;
-                       edgesInputPath = args[0];
-               } else {
-                       System.out.println("Executing Graph Metrics example 
with default parameters and built-in default data.");
-                       System.out.println("  Provide parameters to read input 
data from files.");
-                       System.out.println("  See the documentation for the 
correct format of input files.");
-                       System.out.println("Usage: GraphMetrics <input edges>");
-               }
-               return true;
-       }
-
-       @SuppressWarnings("serial")
-       private static DataSet<Edge<Long, NullValue>> 
getEdgesDataSet(ExecutionEnvironment env) {
-               if (fileOutput) {
-                       return env.readCsvFile(edgesInputPath)
-                                       
.lineDelimiter("\n").fieldDelimiter("\t")
-                                       .types(Long.class, Long.class).map(
-                                                       new 
MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-
-                                                               public 
Edge<Long, NullValue> map(Tuple2<Long, Long> value) {
-                                                                       return 
new Edge<Long, NullValue>(value.f0, value.f1, 
-                                                                               
        NullValue.getInstance());
-                                                               }
-                                       });
-               } else {
-                       return ExampleUtils.getRandomEdges(env, NUM_VERTICES);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
deleted file mode 100644
index f70d5dc..0000000
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples;
-
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.simple.directed.Simplify;
-import 
org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.graph.library.link_analysis.HITS.Result;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
-
-/**
- * Driver for the library implementation of HITS (Hubs and Authorities).
- *
- * This example reads a simple, undirected graph from a CSV file or generates
- * an undirected RMat graph with the given scale and edge factor then 
calculates
- * hub and authority scores for each vertex.
- *
- * @see org.apache.flink.graph.library.link_analysis.HITS
- */
-public class HITS {
-
-       public static final int DEFAULT_ITERATIONS = 10;
-
-       public static final int DEFAULT_SCALE = 10;
-
-       public static final int DEFAULT_EDGE_FACTOR = 16;
-
-       private static void printUsage() {
-               System.out.println(WordUtils.wrap("Hyperlink-Induced Topic 
Search computes two interdependent" +
-                       " scores for every vertex in a directed graph. A good 
\"hub\" links to good \"authorities\"" +
-                       " and good \"authorities\" are linked from good 
\"hubs\".", 80));
-               System.out.println();
-               System.out.println("usage: HITS --input <csv | rmat [options]> 
--output <print | hash | csv [options]>");
-               System.out.println();
-               System.out.println("options:");
-               System.out.println("  --input csv --type <integer | string> 
--input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] 
[--input_field_delimiter FIELD_DELIMITER]");
-               System.out.println("  --input rmat [--scale SCALE] 
[--edge_factor EDGE_FACTOR]");
-               System.out.println();
-               System.out.println("  --output print");
-               System.out.println("  --output hash");
-               System.out.println("  --output csv --output_filename FILENAME 
[--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter 
FIELD_DELIMITER]");
-       }
-
-       public static void main(String[] args) throws Exception {
-               // Set up the execution environment
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().enableObjectReuse();
-
-               ParameterTool parameters = ParameterTool.fromArgs(args);
-               int iterations = parameters.getInt("iterations", 
DEFAULT_ITERATIONS);
-
-               DataSet hits;
-
-               switch (parameters.get("input", "")) {
-                       case "csv": {
-                               String lineDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("input_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-                               String fieldDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("input_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-                               GraphCsvReader reader = Graph
-                                       
.fromCsvReader(parameters.get("input_filename"), env)
-                                               .ignoreCommentsEdges("#")
-                                               
.lineDelimiterEdges(lineDelimiter)
-                                               
.fieldDelimiterEdges(fieldDelimiter);
-
-                               switch (parameters.get("type", "")) {
-                                       case "integer": {
-                                               hits = reader
-                                                       
.keyType(LongValue.class)
-                                                       .run(new 
org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, 
NullValue>(iterations));
-                                       } break;
-
-                                       case "string": {
-                                               hits = reader
-                                                       
.keyType(StringValue.class)
-                                                       .run(new 
org.apache.flink.graph.library.link_analysis.HITS<StringValue, NullValue, 
NullValue>(iterations));
-                                       } break;
-
-                                       default:
-                                               printUsage();
-                                               return;
-                               }
-                               } break;
-
-                       case "rmat": {
-                               int scale = parameters.getInt("scale", 
DEFAULT_SCALE);
-                               int edgeFactor = 
parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-                               RandomGenerableFactory<JDKRandomGenerator> rnd 
= new JDKRandomGeneratorFactory();
-
-                               long vertexCount = 1L << scale;
-                               long edgeCount = vertexCount * edgeFactor;
-
-                               Graph<LongValue, NullValue, NullValue> graph = 
new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-                                       .generate();
-
-                               if (scale > 32) {
-                                       hits = graph
-                                               .run(new Simplify<LongValue, 
NullValue, NullValue>())
-                                               .run(new 
org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, 
NullValue>(iterations));
-                               } else {
-                                       hits = graph
-                                               .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue()))
-                                               .run(new Simplify<IntValue, 
NullValue, NullValue>())
-                                               .run(new 
org.apache.flink.graph.library.link_analysis.HITS<IntValue, NullValue, 
NullValue>(iterations));
-                               }
-                               } break;
-
-                       default:
-                               printUsage();
-                               return;
-               }
-
-               switch (parameters.get("output", "")) {
-                       case "print":
-                               for (Object e: hits.collect()) {
-                                       
System.out.println(((Result)e).toVerboseString());
-                               }
-                               break;
-
-                       case "hash":
-                               
System.out.println(DataSetUtils.checksumHashCode(hits));
-                               break;
-
-                       case "csv":
-                               String filename = 
parameters.get("output_filename");
-
-                               String lineDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("output_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-                               String fieldDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       
parameters.get("output_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-                               hits.writeAsCsv(filename, lineDelimiter, 
fieldDelimiter);
-
-                               env.execute();
-                               break;
-                       default:
-                               printUsage();
-                               return;
-               }
-
-               JobExecutionResult result = env.getLastJobExecutionResult();
-
-               NumberFormat nf = NumberFormat.getInstance();
-               System.out.println("Execution runtime: " + 
nf.format(result.getNetRuntime()) + " ms");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
deleted file mode 100644
index 2845e2d..0000000
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples;
-
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.text.StrBuilder;
-import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.client.program.ProgramParametrizationException;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.simple.undirected.Simplify;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import 
org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
-/**
- * Driver for the library implementation of Jaccard Index.
- *
- * This example reads a simple, undirected graph from a CSV file or generates
- * an undirected RMat graph with the given scale and edge factor then 
calculates
- * all non-zero Jaccard Index similarity scores between vertices.
- *
- * @see org.apache.flink.graph.library.similarity.JaccardIndex
- */
-public class JaccardIndex {
-
-       public static final int DEFAULT_SCALE = 10;
-
-       public static final int DEFAULT_EDGE_FACTOR = 16;
-
-       public static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
-       private static String getUsage(String message) {
-               return new StrBuilder()
-                       .appendNewLine()
-                       .appendln(WordUtils.wrap("The Jaccard Index measures 
the similarity between vertex" +
-                               " neighborhoods and is computed as the number 
of shared neighbors divided by the number of" +
-                               " distinct neighbors. Scores range from 0.0 (no 
shared neighbors) to 1.0 (all neighbors are" +
-                               " shared).", 80))
-                       .appendNewLine()
-                       .appendln(WordUtils.wrap("This algorithm returns 
4-tuples containing two vertex IDs, the" +
-                               " number of shared neighbors, and the number of 
distinct neighbors.", 80))
-                       .appendNewLine()
-                       .appendln("usage: JaccardIndex --input <csv | rmat 
[options]> --output <print | hash | csv [options]>")
-                       .appendNewLine()
-                       .appendln("options:")
-                       .appendln("  --input csv --type <integer | string> 
--input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] 
[--input_field_delimiter FIELD_DELIMITER]")
-                       .appendln("  --input rmat [--scale SCALE] 
[--edge_factor EDGE_FACTOR]")
-                       .appendNewLine()
-                       .appendln("  --output print")
-                       .appendln("  --output hash")
-                       .appendln("  --output csv --output_filename FILENAME 
[--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter 
FIELD_DELIMITER]")
-                       .appendNewLine()
-                       .appendln("Usage error: " + message)
-                       .toString();
-       }
-
-       public static void main(String[] args) throws Exception {
-               // Set up the execution environment
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().enableObjectReuse();
-
-               ParameterTool parameters = ParameterTool.fromArgs(args);
-
-               int little_parallelism = 
parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
-
-               DataSet ji;
-
-               switch (parameters.get("input", "")) {
-                       case "csv": {
-                               String lineDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("input_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-                               String fieldDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("input_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-                               GraphCsvReader reader = Graph
-                                       
.fromCsvReader(parameters.get("input_filename"), env)
-                                               .ignoreCommentsEdges("#")
-                                               
.lineDelimiterEdges(lineDelimiter)
-                                               
.fieldDelimiterEdges(fieldDelimiter);
-
-                               switch (parameters.get("type", "")) {
-                                       case "integer": {
-                                               ji = reader
-                                                       
.keyType(LongValue.class)
-                                                       .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, 
NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                       } break;
-
-                                       case "string": {
-                                               ji = reader
-                                                       
.keyType(StringValue.class)
-                                                       .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex<StringValue, NullValue, 
NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                       } break;
-
-                                       default:
-                                               throw new 
ProgramParametrizationException(getUsage("invalid CSV type"));
-                               }
-                               } break;
-
-                       case "rmat": {
-                               int scale = parameters.getInt("scale", 
DEFAULT_SCALE);
-                               int edgeFactor = 
parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-                               RandomGenerableFactory<JDKRandomGenerator> rnd 
= new JDKRandomGeneratorFactory();
-
-                               long vertexCount = 1L << scale;
-                               long edgeCount = vertexCount * edgeFactor;
-
-                               Graph<LongValue, NullValue, NullValue> graph = 
new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-                                       .setParallelism(little_parallelism)
-                                       .generate();
-
-                               boolean clipAndFlip = 
parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
-
-                               if (scale > 32) {
-                                       ji = graph
-                                               .run(new Simplify<LongValue, 
NullValue, NullValue>(clipAndFlip)
-                                                       
.setParallelism(little_parallelism))
-                                               .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, 
NullValue>()
-                                                       
.setLittleParallelism(little_parallelism));
-                               } else {
-                                       ji = graph
-                                               .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue())
-                                                       
.setParallelism(little_parallelism))
-                                               .run(new Simplify<IntValue, 
NullValue, NullValue>(clipAndFlip)
-                                                       
.setParallelism(little_parallelism))
-                                               .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex<IntValue, NullValue, 
NullValue>()
-                                                       
.setLittleParallelism(little_parallelism));
-                               }
-                               } break;
-
-                       default:
-                               throw new 
ProgramParametrizationException(getUsage("invalid input type"));
-               }
-
-               switch (parameters.get("output", "")) {
-                       case "print":
-                               for (Object e: ji.collect()) {
-                                       Result result = (Result)e;
-                                       
System.out.println(result.toVerboseString());
-                               }
-                               break;
-
-                       case "hash":
-                               
System.out.println(DataSetUtils.checksumHashCode(ji));
-                               break;
-
-                       case "csv":
-                               String filename = 
parameters.get("output_filename");
-
-                               String lineDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("output_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-                               String fieldDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       
parameters.get("output_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-                               ji.writeAsCsv(filename, lineDelimiter, 
fieldDelimiter);
-
-                               env.execute();
-                               break;
-
-                       default:
-                               throw new 
ProgramParametrizationException(getUsage("invalid output type"));
-               }
-
-               JobExecutionResult result = env.getLastJobExecutionResult();
-
-               NumberFormat nf = NumberFormat.getInstance();
-               System.out.println("Execution runtime: " + 
nf.format(result.getNetRuntime()) + " ms");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
deleted file mode 100644
index 43c5eba..0000000
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples;
-
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.simple.undirected.Simplify;
-import 
org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
-
-/**
- * Driver for the library implementation of Triangle Listing.
- *
- * This example reads a simple directed or undirected graph from a CSV file or
- * generates an RMat graph with the given scale and edge factor then lists
- * all triangles.
- *
- * @see org.apache.flink.graph.library.clustering.directed.TriangleListing
- * @see org.apache.flink.graph.library.clustering.undirected.TriangleListing
- */
-public class TriangleListing {
-
-       public static final int DEFAULT_SCALE = 10;
-
-       public static final int DEFAULT_EDGE_FACTOR = 16;
-
-       public static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
-       private static void printUsage() {
-               System.out.println(WordUtils.wrap("Lists all triangles in a 
graph.", 80));
-               System.out.println();
-               System.out.println(WordUtils.wrap("This algorithm returns 
tuples containing the vertex IDs for each triangle and" +
-                       " for directed graphs a bitmask indicating the presence 
of the six potential connecting edges.", 80));
-               System.out.println();
-               System.out.println("usage: TriangleListing --directed <true | 
false> --input <csv | rmat [options]> --output <print | hash | csv [options]>");
-               System.out.println();
-               System.out.println("options:");
-               System.out.println("  --input csv --type <integer | string> 
--input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] 
[--input_field_delimiter FIELD_DELIMITER]");
-               System.out.println("  --input rmat [--scale SCALE] 
[--edge_factor EDGE_FACTOR]");
-               System.out.println();
-               System.out.println("  --output print");
-               System.out.println("  --output hash");
-               System.out.println("  --output csv --output_filename FILENAME 
[--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter 
FIELD_DELIMITER]");
-       }
-
-       public static void main(String[] args) throws Exception {
-               // Set up the execution environment
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().enableObjectReuse();
-
-               ParameterTool parameters = ParameterTool.fromArgs(args);
-               if (! parameters.has("directed")) {
-                       printUsage();
-                       return;
-               }
-               boolean directedAlgorithm = parameters.getBoolean("directed");
-
-               DataSet tl;
-
-               switch (parameters.get("input", "")) {
-                       case "csv": {
-                               String lineDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("input_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-                               String fieldDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("input_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-                               GraphCsvReader reader = Graph
-                                       
.fromCsvReader(parameters.get("input_filename"), env)
-                                               .ignoreCommentsEdges("#")
-                                               
.lineDelimiterEdges(lineDelimiter)
-                                               
.fieldDelimiterEdges(fieldDelimiter);
-
-                               switch (parameters.get("type", "")) {
-                                       case "integer": {
-                                               Graph<LongValue, NullValue, 
NullValue> graph = reader
-                                                       
.keyType(LongValue.class);
-
-                                               if (directedAlgorithm) {
-                                                       tl = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, 
NullValue, NullValue>());
-                                               } else {
-                                                       tl = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, 
NullValue, NullValue>());
-                                               }
-                                       } break;
-
-                                       case "string": {
-                                               Graph<StringValue, NullValue, 
NullValue> graph = reader
-                                                       
.keyType(StringValue.class);
-
-                                               if (directedAlgorithm) {
-                                                       tl = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<StringValue, 
NullValue, NullValue>());
-                                               } else {
-                                                       tl = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<StringValue,
 NullValue, NullValue>());
-                                               }
-                                       } break;
-
-                                       default:
-                                               printUsage();
-                                               return;
-                               }
-
-
-                       } break;
-
-                       case "rmat": {
-                               int scale = parameters.getInt("scale", 
DEFAULT_SCALE);
-                               int edgeFactor = 
parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-                               RandomGenerableFactory<JDKRandomGenerator> rnd 
= new JDKRandomGeneratorFactory();
-
-                               long vertexCount = 1L << scale;
-                               long edgeCount = vertexCount * edgeFactor;
-
-                               Graph<LongValue, NullValue, NullValue> graph = 
new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-                                       .generate();
-
-                               if (directedAlgorithm) {
-                                       if (scale > 32) {
-                                               tl = graph
-                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, 
NullValue>())
-                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, 
NullValue, NullValue>());
-                                       } else {
-                                               tl = graph
-                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue()))
-                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, 
NullValue>())
-                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<IntValue, 
NullValue, NullValue>());
-                                       }
-                               } else {
-                                       boolean clipAndFlip = 
parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
-
-                                       graph = graph
-                                               .run(new Simplify<LongValue, 
NullValue, NullValue>(clipAndFlip));
-
-                                       if (scale > 32) {
-                                               tl = graph
-                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, 
NullValue>(clipAndFlip))
-                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, 
NullValue, NullValue>());
-                                       } else {
-                                               tl = graph
-                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue()))
-                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, 
NullValue>(clipAndFlip))
-                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, 
NullValue, NullValue>());
-                                       }
-                               }
-                       } break;
-
-                       default:
-                               printUsage();
-                               return;
-               }
-
-               switch (parameters.get("output", "")) {
-                       case "print":
-                               if (directedAlgorithm) {
-                                       for (Object e: tl.collect()) {
-                                               
org.apache.flink.graph.library.clustering.directed.TriangleListing.Result 
result =
-                                                       
(org.apache.flink.graph.library.clustering.directed.TriangleListing.Result) e;
-                                               
System.out.println(result.toVerboseString());
-                                       }
-                               } else {
-                                       tl.print();
-                               }
-                               break;
-
-                       case "hash":
-                               
System.out.println(DataSetUtils.checksumHashCode(tl));
-                               break;
-
-                       case "csv":
-                               String filename = 
parameters.get("output_filename");
-
-                               String lineDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("output_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-                               String fieldDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       
parameters.get("output_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-                               tl.writeAsCsv(filename, lineDelimiter, 
fieldDelimiter);
-
-                               env.execute();
-                               break;
-                       default:
-                               printUsage();
-                               return;
-               }
-
-               JobExecutionResult result = env.getLastJobExecutionResult();
-
-               NumberFormat nf = NumberFormat.getInstance();
-               System.out.println("Execution runtime: " + 
nf.format(result.getNetRuntime()) + " ms");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java
deleted file mode 100644
index b1bc831..0000000
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples.utils;
-
-import java.io.PrintStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
-
-public class ExampleUtils {
-
-       @SuppressWarnings({ "serial", "unchecked", "rawtypes" })
-       public static void printResult(DataSet set, String msg) {
-               set.output(new PrintingOutputFormatWithMessage(msg) {
-               });
-       }
-
-       public static class PrintingOutputFormatWithMessage<T> implements
-                       OutputFormat<T> {
-
-               private static final long serialVersionUID = 1L;
-
-               private transient PrintStream stream;
-
-               private transient String prefix;
-
-               private String message;
-
-               // 
--------------------------------------------------------------------------------------------
-
-               /**
-                * Instantiates a printing output format that prints to 
standard out.
-                */
-               public PrintingOutputFormatWithMessage() {
-               }
-
-               public PrintingOutputFormatWithMessage(String msg) {
-                       this.message = msg;
-               }
-
-               @Override
-               public void open(int taskNumber, int numTasks) {
-                       // get the target stream
-                       this.stream = System.out;
-
-                       // set the prefix to message
-                       this.prefix = message + ": ";
-               }
-
-               @Override
-               public void writeRecord(T record) {
-                       if (this.prefix != null) {
-                               this.stream.println(this.prefix + 
record.toString());
-                       } else {
-                               this.stream.println(record.toString());
-                       }
-               }
-
-               @Override
-               public void close() {
-                       this.stream = null;
-                       this.prefix = null;
-               }
-
-               @Override
-               public String toString() {
-                       return "Print to System.out";
-               }
-
-               @Override
-               public void configure(Configuration parameters) {
-               }
-       }
-
-       @SuppressWarnings("serial")
-       public static DataSet<Vertex<Long, NullValue>> getVertexIds(
-                       ExecutionEnvironment env, final long numVertices) {
-               return env.generateSequence(1, numVertices).map(
-                               new MapFunction<Long, Vertex<Long, 
NullValue>>() {
-                                       public Vertex<Long, NullValue> map(Long 
l) {
-                                               return new Vertex<Long, 
NullValue>(l, NullValue
-                                                               .getInstance());
-                                       }
-                               });
-       }
-
-       @SuppressWarnings("serial")
-       public static DataSet<Edge<Long, NullValue>> getRandomEdges(
-                       ExecutionEnvironment env, final long numVertices) {
-               return env.generateSequence(1, numVertices).flatMap(
-                               new FlatMapFunction<Long, Edge<Long, 
NullValue>>() {
-                                       @Override
-                                       public void flatMap(Long key, 
Collector<Edge<Long, NullValue>> out) throws Exception {
-                                               int numOutEdges = (int) 
(Math.random() * (numVertices / 2));
-                                               for (int i = 0; i < 
numOutEdges; i++) {
-                                                       long target = (long) 
(Math.random() * numVertices) + 1;
-                                                       out.collect(new 
Edge<Long, NullValue>(key, target,
-                                                                       
NullValue.getInstance()));
-                                               }
-                                       }
-                               });
-       }
-
-       public static DataSet<Vertex<Long, Double>> getLongDoubleVertexData(
-                       ExecutionEnvironment env) {
-               List<Vertex<Long, Double>> vertices = new 
ArrayList<Vertex<Long, Double>>();
-               vertices.add(new Vertex<Long, Double>(1L, 1.0));
-               vertices.add(new Vertex<Long, Double>(2L, 2.0));
-               vertices.add(new Vertex<Long, Double>(3L, 3.0));
-               vertices.add(new Vertex<Long, Double>(4L, 4.0));
-               vertices.add(new Vertex<Long, Double>(5L, 5.0));
-
-               return env.fromCollection(vertices);
-       }
-
-       public static DataSet<Edge<Long, Double>> getLongDoubleEdgeData(
-                       ExecutionEnvironment env) {
-               List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, 
Double>>();
-               edges.add(new Edge<Long, Double>(1L, 2L, 12.0));
-               edges.add(new Edge<Long, Double>(1L, 3L, 13.0));
-               edges.add(new Edge<Long, Double>(2L, 3L, 23.0));
-               edges.add(new Edge<Long, Double>(3L, 4L, 34.0));
-               edges.add(new Edge<Long, Double>(3L, 5L, 35.0));
-               edges.add(new Edge<Long, Double>(4L, 5L, 45.0));
-               edges.add(new Edge<Long, Double>(5L, 1L, 51.0));
-
-               return env.fromCollection(edges);
-       }
-
-       /**
-        * Private constructor to prevent instantiation.
-        */
-       private ExampleUtils() {
-               throw new RuntimeException();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
 
b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
deleted file mode 100644
index ebf43d4..0000000
--- 
a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.examples
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.types.NullValue
-import org.apache.flink.graph.Edge
-import org.apache.flink.util.Collector
-
-/**
- * This example illustrates how to use Gelly metrics methods and get simple 
statistics
- * from the input graph.  
- * 
- * The program creates a random graph and computes and prints
- * the following metrics:
- * - number of vertices
- * - number of edges
- * - average node degree
- * - the vertex ids with the max/min in- and out-degrees
- *
- * The input file is expected to contain one edge per line,
- * with long IDs and no values, in the following format:
- * {{{
- *   <sourceVertexID>\t<targetVertexID>
- * }}}
- * If no arguments are provided, the example runs with a random graph of 100 
vertices.
- *
- */
-object GraphMetrics {
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    /** create the graph **/
-    val graph: Graph[Long, NullValue, NullValue] = 
Graph.fromDataSet(getEdgeDataSet(env), env)
-
-    /** get the number of vertices **/
-    val numVertices = graph.numberOfVertices
-
-    /** get the number of edges **/
-    val numEdges = graph.numberOfEdges
-
-    /** compute the average node degree **/
-    val verticesWithDegrees = graph.getDegrees
-    val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2.getValue / 
numVertices).toDouble)
-
-    /** find the vertex with the maximum in-degree **/
-    val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1)
-
-    /** find the vertex with the minimum in-degree **/
-    val minInDegreeVertex = graph.inDegrees.min(1).map(in => in._1)
-
-    /** find the vertex with the maximum out-degree **/
-    val maxOutDegreeVertex = graph.outDegrees.max(1).map(in => in._1)
-
-    /** find the vertex with the minimum out-degree **/
-    val minOutDegreeVertex = graph.outDegrees.min(1).map(in => in._1)
-
-    /** print the results **/
-    env.fromElements(numVertices).printOnTaskManager("Total number of 
vertices")
-    env.fromElements(numEdges).printOnTaskManager("Total number of edges")
-    avgDegree.printOnTaskManager("Average node degree")
-    maxInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
-    minInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
-    maxOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
-    minOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
-
-  }
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      fileOutput = true
-      if (args.length == 1) {
-        edgesPath = args(0)
-        true
-      } else {
-        System.err.println("Usage: GraphMetrics <edges path>")
-        false
-      }
-    } else {
-      System.out.println("Executing GraphMetrics example with built-in default 
data.")
-      System.out.println("  Provide parameters to read input data from a 
file.")
-      System.out.println("  Usage: GraphMetrics <edges path>")
-      true
-    }
-  }
-
-  private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, 
NullValue]] = {
-    if (fileOutput) {
-      env.readCsvFile[(Long, Long)](
-        edgesPath,
-        fieldDelimiter = "\t").map(
-        in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance()))
-    } else {
-      env.generateSequence(1, numVertices).flatMap[Edge[Long, NullValue]](
-        (key: Long, out: Collector[Edge[Long, NullValue]]) => {
-          val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt
-          for ( i <- 0 to numOutEdges ) {
-            val target: Long = ((Math.random() * numVertices) + 1).toLong
-            new Edge[Long, NullValue](key, target, NullValue.getInstance())
-          }
-      })
-    }
-  }
-
-  private var fileOutput: Boolean = false
-  private var edgesPath: String = null
-  private var outputPath: String = null
-  private val numVertices = 100
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
index 167e31c..b3e1e30 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
@@ -367,6 +367,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                }
 
                /**
+                * Get the average degree.
+                *
+                * @return average degree
+                */
+               public float getAverageDegree() {
+                       return edgeCount / (float)vertexCount;
+               }
+
+               /**
                 * Get the number of triangle triplets.
                 *
                 * @return number of triangle triplets
@@ -453,6 +462,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
                        return "vertex count: " + nf.format(vertexCount)
                                + "; edge count: " + nf.format(edgeCount)
+                               + "; average degree: " + 
nf.format(getAverageDegree())
                                + "; triangle triplet count: " + 
nf.format(triangleTripletCount)
                                + "; rectangle triplet count: " + 
nf.format(rectangleTripletCount)
                                + "; triplet count: " + nf.format(tripletCount)

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
index 22f7733..909eea5 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
@@ -228,6 +228,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                }
 
                /**
+                * Get the average degree.
+                *
+                * @return average degree
+                */
+               public float getAverageDegree() {
+                       return edgeCount / (float)vertexCount;
+               }
+
+               /**
                 * Get the number of triplets.
                 *
                 * @return number of triplets
@@ -278,6 +287,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
                        return "vertex count: " + nf.format(vertexCount)
                                + "; edge count: " + nf.format(edgeCount)
+                               + "; average degree: " + 
nf.format(getAverageDegree())
                                + "; triplet count: " + nf.format(tripletCount)
                                + "; maximum degree: " + 
nf.format(maximumDegree)
                                + "; maximum out degree: " + 
nf.format(maximumOutDegree)

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
index 1d5b664..6bce42c 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
@@ -329,6 +329,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                }
 
                /**
+                * Get the average degree.
+                *
+                * @return average degree
+                */
+               public float getAverageDegree() {
+                       return edgeCount / (float)vertexCount;
+               }
+
+               /**
                 * Get the number of triangle triplets.
                 *
                 * @return number of triangle triplets
@@ -397,6 +406,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
                        return "vertex count: " + nf.format(vertexCount)
                                + "; edge count: " + nf.format(edgeCount)
+                               + "; average degree: " + 
nf.format(getAverageDegree())
                                + "; triangle triplet count: " + 
nf.format(triangleTripletCount)
                                + "; rectangle triplet count: " + 
nf.format(rectangleTripletCount)
                                + "; triplet count: " + nf.format(tripletCount)

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
index d04fa7b..8012605 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
@@ -230,6 +230,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                }
 
                /**
+                * Get the average degree.
+                *
+                * @return average degree
+                */
+               public float getAverageDegree() {
+                       return edgeCount / (float)vertexCount;
+               }
+
+               /**
                 * Get the number of triplets.
                 *
                 * @return number of triplets
@@ -262,6 +271,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
                        return "vertex count: " + nf.format(vertexCount)
                                + "; edge count: " + nf.format(edgeCount)
+                               + "; average degree: " + 
nf.format(getAverageDegree())
                                + "; triplet count: " + nf.format(tripletCount)
                                + "; maximum degree: " + 
nf.format(maximumDegree)
                                + "; maximum triplets: " + 
nf.format(maximumTriplets);

Reply via email to