Repository: flink Updated Branches: refs/heads/master c4783c856 -> c4f9f0d78
http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java deleted file mode 100644 index 615d765..0000000 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java +++ /dev/null @@ -1,326 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.examples; - -import org.apache.commons.lang3.StringEscapeUtils; -import org.apache.commons.lang3.text.WordUtils; -import org.apache.commons.math3.random.JDKRandomGenerator; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.CsvOutputFormat; -import org.apache.flink.api.java.utils.DataSetUtils; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAnalytic; -import org.apache.flink.graph.GraphCsvReader; -import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue; -import org.apache.flink.graph.asm.translate.TranslateGraphIds; -import org.apache.flink.graph.generator.RMatGraph; -import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; -import org.apache.flink.graph.generator.random.RandomGenerableFactory; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.NullValue; -import org.apache.flink.types.StringValue; - -import java.text.NumberFormat; - -import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; - -/** - * Driver for the library implementations of Global and Local Clustering Coefficient. - * - * This example reads a simple directed or undirected graph from a CSV file or - * generates an RMat graph with the given scale and edge factor then calculates - * the local clustering coefficient for each vertex and the global clustering - * coefficient for the graph. - * - * @see org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient - * @see org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient - * @see org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient - * @see org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient - */ -public class ClusteringCoefficient { - - public static final int DEFAULT_SCALE = 10; - - public static final int DEFAULT_EDGE_FACTOR = 16; - - public static final boolean DEFAULT_CLIP_AND_FLIP = true; - - private static void printUsage() { - System.out.println(WordUtils.wrap("The local clustering coefficient measures the connectedness of each" + - " vertex's neighborhood and the global clustering coefficient measures the connectedness of the graph." + - " Scores range from 0.0 (no edges between neighbors or vertices) to 1.0 (neighborhood or graph" + - " is a clique).", 80)); - System.out.println(); - System.out.println(WordUtils.wrap("This algorithm returns tuples containing the vertex ID, the degree of" + - " the vertex, and the number of edges between vertex neighbors.", 80)); - System.out.println(); - System.out.println("usage: ClusteringCoefficient --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]>"); - System.out.println(); - System.out.println("options:"); - System.out.println(" --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]"); - System.out.println(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]"); - System.out.println(); - System.out.println(" --output print"); - System.out.println(" --output hash"); - System.out.println(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]"); - } - - public static void main(String[] args) throws Exception { - // Set up the execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); - - ParameterTool parameters = ParameterTool.fromArgs(args); - - if (! parameters.has("directed")) { - printUsage(); - return; - } - boolean directedAlgorithm = parameters.getBoolean("directed"); - - int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT); - - // global and local clustering coefficient results - GraphAnalytic gcc; - GraphAnalytic acc; - DataSet lcc; - - switch (parameters.get("input", "")) { - case "csv": { - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - GraphCsvReader reader = Graph - .fromCsvReader(parameters.get("input_filename"), env) - .ignoreCommentsEdges("#") - .lineDelimiterEdges(lineDelimiter) - .fieldDelimiterEdges(fieldDelimiter); - - switch (parameters.get("type", "")) { - case "integer": { - Graph<LongValue, NullValue, NullValue> graph = reader - .keyType(LongValue.class); - - if (directedAlgorithm) { - gcc = graph - .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - acc = graph - .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - lcc = graph - .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } else { - gcc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - acc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - lcc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } - } break; - - case "string": { - Graph<StringValue, NullValue, NullValue> graph = reader - .keyType(StringValue.class); - - if (directedAlgorithm) { - gcc = graph - .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - acc = graph - .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<StringValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - lcc = graph - .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<StringValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } else { - gcc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - acc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<StringValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - lcc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<StringValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } - } break; - - default: - printUsage(); - return; - } - } break; - - case "rmat": { - int scale = parameters.getInt("scale", DEFAULT_SCALE); - int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); - - RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory(); - - long vertexCount = 1L << scale; - long edgeCount = vertexCount * edgeFactor; - - Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) - .setParallelism(little_parallelism) - .generate(); - - if (directedAlgorithm) { - if (scale > 32) { - Graph<LongValue, NullValue, NullValue> newGraph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>() - .setParallelism(little_parallelism)); - - gcc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - acc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - lcc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>() - .setIncludeZeroDegreeVertices(false) - .setLittleParallelism(little_parallelism)); - } else { - Graph<IntValue, NullValue, NullValue> newGraph = graph - .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()) - .setParallelism(little_parallelism)) - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>() - .setParallelism(little_parallelism)); - - gcc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - acc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<IntValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - lcc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue, NullValue, NullValue>() - .setIncludeZeroDegreeVertices(false) - .setLittleParallelism(little_parallelism)); - } - } else { - boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); - - if (scale > 32) { - Graph<LongValue, NullValue, NullValue> newGraph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip) - .setParallelism(little_parallelism)); - - gcc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - acc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - lcc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>() - .setIncludeZeroDegreeVertices(false) - .setLittleParallelism(little_parallelism)); - } else { - Graph<IntValue, NullValue, NullValue> newGraph = graph - .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()) - .setParallelism(little_parallelism)) - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip) - .setParallelism(little_parallelism)); - - gcc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - acc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<IntValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - lcc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>() - .setIncludeZeroDegreeVertices(false) - .setLittleParallelism(little_parallelism)); - } - } - } break; - - default: - printUsage(); - return; - } - - switch (parameters.get("output", "")) { - case "print": - if (directedAlgorithm) { - for (Object e: lcc.collect()) { - org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result result = - (org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result)e; - System.out.println(result.toVerboseString()); - } - } else { - for (Object e: lcc.collect()) { - org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result result = - (org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result)e; - System.out.println(result.toVerboseString()); - } - } - System.out.println(gcc.getResult()); - System.out.println(acc.getResult()); - break; - - case "hash": - System.out.println(DataSetUtils.checksumHashCode(lcc)); - System.out.println(gcc.getResult()); - System.out.println(acc.getResult()); - break; - - case "csv": - String filename = parameters.get("output_filename"); - - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - lcc.writeAsCsv(filename, lineDelimiter, fieldDelimiter); - - env.execute("Clustering Coefficient"); - - System.out.println(gcc.getResult()); - System.out.println(acc.getResult()); - break; - - default: - printUsage(); - return; - } - - JobExecutionResult result = env.getLastJobExecutionResult(); - - NumberFormat nf = NumberFormat.getInstance(); - System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java deleted file mode 100644 index 73bba2c..0000000 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.examples; - -import org.apache.commons.math3.random.JDKRandomGenerator; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.CsvOutputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.utils.DataSetUtils; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.asm.simple.undirected.Simplify; -import org.apache.flink.graph.generator.RMatGraph; -import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; -import org.apache.flink.graph.generator.random.RandomGenerableFactory; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.NullValue; - -import java.text.NumberFormat; - -/** - * Generate an RMat graph for Graph 500. - * - * Note that this does not yet implement permutation of vertex labels or edges. - * - * @see <a href="http://www.graph500.org/specifications">Graph 500</a> - */ -public class Graph500 { - - public static final int DEFAULT_SCALE = 10; - - public static final int DEFAULT_EDGE_FACTOR = 16; - - public static final boolean DEFAULT_SIMPLIFY = false; - - public static final boolean DEFAULT_CLIP_AND_FLIP = true; - - public static void main(String[] args) throws Exception { - // Set up the execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); - - ParameterTool parameters = ParameterTool.fromArgs(args); - - // Generate RMat graph - int scale = parameters.getInt("scale", DEFAULT_SCALE); - int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); - - RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory(); - - long vertexCount = 1L << scale; - long edgeCount = vertexCount * edgeFactor; - - boolean simplify = parameters.getBoolean("simplify", DEFAULT_SIMPLIFY); - boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); - - Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) - .generate(); - - if (simplify) { - graph = graph.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip)); - } - - DataSet<Tuple2<LongValue,LongValue>> edges = graph - .getEdges() - .project(0, 1); - - // Print, hash, or write RMat graph to disk - switch (parameters.get("output", "")) { - case "print": - edges.print(); - break; - - case "hash": - System.out.println(DataSetUtils.checksumHashCode(edges)); - break; - - case "csv": - String filename = parameters.get("filename"); - - String row_delimiter = parameters.get("row_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER); - String field_delimiter = parameters.get("field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER); - - edges.writeAsCsv(filename, row_delimiter, field_delimiter); - - env.execute(); - break; - default: - System.out.println("A Graph500 generator using the Recursive Matrix (RMat) graph generator."); - System.out.println(); - System.out.println("The graph matrix contains 2^scale vertices although not every vertex will"); - System.out.println("be represented in an edge. The number of edges is edge_factor * 2^scale edges"); - System.out.println("although some edges may be duplicates."); - System.out.println(); - System.out.println("Note: this does not yet implement permutation of vertex labels or edges."); - System.out.println(); - System.out.println("usage:"); - System.out.println(" Graph500 [--scale SCALE] [--edge_factor EDGE_FACTOR] --output print"); - System.out.println(" Graph500 [--scale SCALE] [--edge_factor EDGE_FACTOR] --output hash"); - System.out.println(" Graph500 [--scale SCALE] [--edge_factor EDGE_FACTOR] --output csv" + - " --filename FILENAME [--row_delimiter ROW_DELIMITER] [--field_delimiter FIELD_DELIMITER]"); - - return; - } - - JobExecutionResult result = env.getLastJobExecutionResult(); - - NumberFormat nf = NumberFormat.getInstance(); - System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java deleted file mode 100644 index e7b47bf..0000000 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.examples; - -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.aggregation.Aggregations; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.examples.utils.ExampleUtils; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.NullValue; - -/** - * This example illustrates how to use Gelly metrics methods and get simple statistics - * from the input graph. - * - * The program creates a random graph and computes and prints - * the following metrics: - * - number of vertices - * - number of edges - * - average node degree - * - the vertex ids with the max/min in- and out-degrees - * - * The input file is expected to contain one edge per line, - * with long IDs and no values, in the following format: - * "<sourceVertexID>\t<targetVertexID>". - * If no arguments are provided, the example runs with a random graph of 100 vertices. - * - */ -public class GraphMetrics implements ProgramDescription { - - public static void main(String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - /** create the graph **/ - Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(getEdgesDataSet(env), env); - - /** get the number of vertices **/ - long numVertices = graph.numberOfVertices(); - - /** get the number of edges **/ - long numEdges = graph.numberOfEdges(); - - /** compute the average node degree **/ - DataSet<Tuple2<Long, LongValue>> verticesWithDegrees = graph.getDegrees(); - - DataSet<Double> avgNodeDegree = verticesWithDegrees - .aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper(numVertices)); - - /** find the vertex with the maximum in-degree **/ - DataSet<Long> maxInDegreeVertex = graph.inDegrees().maxBy(1).map(new ProjectVertexId()); - - /** find the vertex with the minimum in-degree **/ - DataSet<Long> minInDegreeVertex = graph.inDegrees().minBy(1).map(new ProjectVertexId()); - - /** find the vertex with the maximum out-degree **/ - DataSet<Long> maxOutDegreeVertex = graph.outDegrees().maxBy(1).map(new ProjectVertexId()); - - /** find the vertex with the minimum out-degree **/ - DataSet<Long> minOutDegreeVertex = graph.outDegrees().minBy(1).map(new ProjectVertexId()); - - /** print the results **/ - ExampleUtils.printResult(env.fromElements(numVertices), "Total number of vertices"); - ExampleUtils.printResult(env.fromElements(numEdges), "Total number of edges"); - ExampleUtils.printResult(avgNodeDegree, "Average node degree"); - ExampleUtils.printResult(maxInDegreeVertex, "Vertex with Max in-degree"); - ExampleUtils.printResult(minInDegreeVertex, "Vertex with Min in-degree"); - ExampleUtils.printResult(maxOutDegreeVertex, "Vertex with Max out-degree"); - ExampleUtils.printResult(minOutDegreeVertex, "Vertex with Min out-degree"); - - env.execute(); - } - - @SuppressWarnings("serial") - private static final class AvgNodeDegreeMapper implements MapFunction<Tuple2<Long, LongValue>, Double> { - - private long numberOfVertices; - - public AvgNodeDegreeMapper(long numberOfVertices) { - this.numberOfVertices = numberOfVertices; - } - - public Double map(Tuple2<Long, LongValue> sumTuple) { - return (double) (sumTuple.f1.getValue() / numberOfVertices) ; - } - } - - @SuppressWarnings("serial") - private static final class ProjectVertexId implements MapFunction<Tuple2<Long, LongValue>, Long> { - public Long map(Tuple2<Long, LongValue> value) { return value.f0; } - } - - @Override - public String getDescription() { - return "Graph Metrics Example"; - } - - // ****************************************************************************************************************** - // UTIL METHODS - // ****************************************************************************************************************** - - private static boolean fileOutput = false; - - private static String edgesInputPath = null; - - static final int NUM_VERTICES = 100; - - static final long SEED = 9876; - - private static boolean parseParameters(String[] args) { - - if(args.length > 0) { - if(args.length != 1) { - System.err.println("Usage: GraphMetrics <input edges>"); - return false; - } - - fileOutput = true; - edgesInputPath = args[0]; - } else { - System.out.println("Executing Graph Metrics example with default parameters and built-in default data."); - System.out.println(" Provide parameters to read input data from files."); - System.out.println(" See the documentation for the correct format of input files."); - System.out.println("Usage: GraphMetrics <input edges>"); - } - return true; - } - - @SuppressWarnings("serial") - private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) { - if (fileOutput) { - return env.readCsvFile(edgesInputPath) - .lineDelimiter("\n").fieldDelimiter("\t") - .types(Long.class, Long.class).map( - new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() { - - public Edge<Long, NullValue> map(Tuple2<Long, Long> value) { - return new Edge<Long, NullValue>(value.f0, value.f1, - NullValue.getInstance()); - } - }); - } else { - return ExampleUtils.getRandomEdges(env, NUM_VERTICES); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java deleted file mode 100644 index f70d5dc..0000000 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.examples; - -import org.apache.commons.lang3.StringEscapeUtils; -import org.apache.commons.lang3.text.WordUtils; -import org.apache.commons.math3.random.JDKRandomGenerator; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.CsvOutputFormat; -import org.apache.flink.api.java.utils.DataSetUtils; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphCsvReader; -import org.apache.flink.graph.asm.simple.directed.Simplify; -import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue; -import org.apache.flink.graph.asm.translate.TranslateGraphIds; -import org.apache.flink.graph.generator.RMatGraph; -import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; -import org.apache.flink.graph.generator.random.RandomGenerableFactory; -import org.apache.flink.graph.library.link_analysis.HITS.Result; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.NullValue; -import org.apache.flink.types.StringValue; - -import java.text.NumberFormat; - -/** - * Driver for the library implementation of HITS (Hubs and Authorities). - * - * This example reads a simple, undirected graph from a CSV file or generates - * an undirected RMat graph with the given scale and edge factor then calculates - * hub and authority scores for each vertex. - * - * @see org.apache.flink.graph.library.link_analysis.HITS - */ -public class HITS { - - public static final int DEFAULT_ITERATIONS = 10; - - public static final int DEFAULT_SCALE = 10; - - public static final int DEFAULT_EDGE_FACTOR = 16; - - private static void printUsage() { - System.out.println(WordUtils.wrap("Hyperlink-Induced Topic Search computes two interdependent" + - " scores for every vertex in a directed graph. A good \"hub\" links to good \"authorities\"" + - " and good \"authorities\" are linked from good \"hubs\".", 80)); - System.out.println(); - System.out.println("usage: HITS --input <csv | rmat [options]> --output <print | hash | csv [options]>"); - System.out.println(); - System.out.println("options:"); - System.out.println(" --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]"); - System.out.println(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]"); - System.out.println(); - System.out.println(" --output print"); - System.out.println(" --output hash"); - System.out.println(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]"); - } - - public static void main(String[] args) throws Exception { - // Set up the execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); - - ParameterTool parameters = ParameterTool.fromArgs(args); - int iterations = parameters.getInt("iterations", DEFAULT_ITERATIONS); - - DataSet hits; - - switch (parameters.get("input", "")) { - case "csv": { - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - GraphCsvReader reader = Graph - .fromCsvReader(parameters.get("input_filename"), env) - .ignoreCommentsEdges("#") - .lineDelimiterEdges(lineDelimiter) - .fieldDelimiterEdges(fieldDelimiter); - - switch (parameters.get("type", "")) { - case "integer": { - hits = reader - .keyType(LongValue.class) - .run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations)); - } break; - - case "string": { - hits = reader - .keyType(StringValue.class) - .run(new org.apache.flink.graph.library.link_analysis.HITS<StringValue, NullValue, NullValue>(iterations)); - } break; - - default: - printUsage(); - return; - } - } break; - - case "rmat": { - int scale = parameters.getInt("scale", DEFAULT_SCALE); - int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); - - RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory(); - - long vertexCount = 1L << scale; - long edgeCount = vertexCount * edgeFactor; - - Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) - .generate(); - - if (scale > 32) { - hits = graph - .run(new Simplify<LongValue, NullValue, NullValue>()) - .run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations)); - } else { - hits = graph - .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())) - .run(new Simplify<IntValue, NullValue, NullValue>()) - .run(new org.apache.flink.graph.library.link_analysis.HITS<IntValue, NullValue, NullValue>(iterations)); - } - } break; - - default: - printUsage(); - return; - } - - switch (parameters.get("output", "")) { - case "print": - for (Object e: hits.collect()) { - System.out.println(((Result)e).toVerboseString()); - } - break; - - case "hash": - System.out.println(DataSetUtils.checksumHashCode(hits)); - break; - - case "csv": - String filename = parameters.get("output_filename"); - - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - hits.writeAsCsv(filename, lineDelimiter, fieldDelimiter); - - env.execute(); - break; - default: - printUsage(); - return; - } - - JobExecutionResult result = env.getLastJobExecutionResult(); - - NumberFormat nf = NumberFormat.getInstance(); - System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java deleted file mode 100644 index 2845e2d..0000000 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.examples; - -import org.apache.commons.lang3.StringEscapeUtils; -import org.apache.commons.lang3.text.StrBuilder; -import org.apache.commons.lang3.text.WordUtils; -import org.apache.commons.math3.random.JDKRandomGenerator; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.CsvOutputFormat; -import org.apache.flink.api.java.utils.DataSetUtils; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.client.program.ProgramParametrizationException; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphCsvReader; -import org.apache.flink.graph.asm.simple.undirected.Simplify; -import org.apache.flink.graph.asm.translate.TranslateGraphIds; -import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue; -import org.apache.flink.graph.generator.RMatGraph; -import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; -import org.apache.flink.graph.generator.random.RandomGenerableFactory; -import org.apache.flink.graph.library.similarity.JaccardIndex.Result; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.NullValue; -import org.apache.flink.types.StringValue; - -import java.text.NumberFormat; - -import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; - -/** - * Driver for the library implementation of Jaccard Index. - * - * This example reads a simple, undirected graph from a CSV file or generates - * an undirected RMat graph with the given scale and edge factor then calculates - * all non-zero Jaccard Index similarity scores between vertices. - * - * @see org.apache.flink.graph.library.similarity.JaccardIndex - */ -public class JaccardIndex { - - public static final int DEFAULT_SCALE = 10; - - public static final int DEFAULT_EDGE_FACTOR = 16; - - public static final boolean DEFAULT_CLIP_AND_FLIP = true; - - private static String getUsage(String message) { - return new StrBuilder() - .appendNewLine() - .appendln(WordUtils.wrap("The Jaccard Index measures the similarity between vertex" + - " neighborhoods and is computed as the number of shared neighbors divided by the number of" + - " distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all neighbors are" + - " shared).", 80)) - .appendNewLine() - .appendln(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" + - " number of shared neighbors, and the number of distinct neighbors.", 80)) - .appendNewLine() - .appendln("usage: JaccardIndex --input <csv | rmat [options]> --output <print | hash | csv [options]>") - .appendNewLine() - .appendln("options:") - .appendln(" --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]") - .appendln(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]") - .appendNewLine() - .appendln(" --output print") - .appendln(" --output hash") - .appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]") - .appendNewLine() - .appendln("Usage error: " + message) - .toString(); - } - - public static void main(String[] args) throws Exception { - // Set up the execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); - - ParameterTool parameters = ParameterTool.fromArgs(args); - - int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT); - - DataSet ji; - - switch (parameters.get("input", "")) { - case "csv": { - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - GraphCsvReader reader = Graph - .fromCsvReader(parameters.get("input_filename"), env) - .ignoreCommentsEdges("#") - .lineDelimiterEdges(lineDelimiter) - .fieldDelimiterEdges(fieldDelimiter); - - switch (parameters.get("type", "")) { - case "integer": { - ji = reader - .keyType(LongValue.class) - .run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } break; - - case "string": { - ji = reader - .keyType(StringValue.class) - .run(new org.apache.flink.graph.library.similarity.JaccardIndex<StringValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } break; - - default: - throw new ProgramParametrizationException(getUsage("invalid CSV type")); - } - } break; - - case "rmat": { - int scale = parameters.getInt("scale", DEFAULT_SCALE); - int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); - - RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory(); - - long vertexCount = 1L << scale; - long edgeCount = vertexCount * edgeFactor; - - Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) - .setParallelism(little_parallelism) - .generate(); - - boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); - - if (scale > 32) { - ji = graph - .run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip) - .setParallelism(little_parallelism)) - .run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } else { - ji = graph - .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()) - .setParallelism(little_parallelism)) - .run(new Simplify<IntValue, NullValue, NullValue>(clipAndFlip) - .setParallelism(little_parallelism)) - .run(new org.apache.flink.graph.library.similarity.JaccardIndex<IntValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } - } break; - - default: - throw new ProgramParametrizationException(getUsage("invalid input type")); - } - - switch (parameters.get("output", "")) { - case "print": - for (Object e: ji.collect()) { - Result result = (Result)e; - System.out.println(result.toVerboseString()); - } - break; - - case "hash": - System.out.println(DataSetUtils.checksumHashCode(ji)); - break; - - case "csv": - String filename = parameters.get("output_filename"); - - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - ji.writeAsCsv(filename, lineDelimiter, fieldDelimiter); - - env.execute(); - break; - - default: - throw new ProgramParametrizationException(getUsage("invalid output type")); - } - - JobExecutionResult result = env.getLastJobExecutionResult(); - - NumberFormat nf = NumberFormat.getInstance(); - System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java deleted file mode 100644 index 43c5eba..0000000 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.examples; - -import org.apache.commons.lang3.StringEscapeUtils; -import org.apache.commons.lang3.text.WordUtils; -import org.apache.commons.math3.random.JDKRandomGenerator; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.CsvOutputFormat; -import org.apache.flink.api.java.utils.DataSetUtils; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphCsvReader; -import org.apache.flink.graph.asm.simple.undirected.Simplify; -import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue; -import org.apache.flink.graph.asm.translate.TranslateGraphIds; -import org.apache.flink.graph.generator.RMatGraph; -import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; -import org.apache.flink.graph.generator.random.RandomGenerableFactory; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.NullValue; -import org.apache.flink.types.StringValue; - -import java.text.NumberFormat; - -/** - * Driver for the library implementation of Triangle Listing. - * - * This example reads a simple directed or undirected graph from a CSV file or - * generates an RMat graph with the given scale and edge factor then lists - * all triangles. - * - * @see org.apache.flink.graph.library.clustering.directed.TriangleListing - * @see org.apache.flink.graph.library.clustering.undirected.TriangleListing - */ -public class TriangleListing { - - public static final int DEFAULT_SCALE = 10; - - public static final int DEFAULT_EDGE_FACTOR = 16; - - public static final boolean DEFAULT_CLIP_AND_FLIP = true; - - private static void printUsage() { - System.out.println(WordUtils.wrap("Lists all triangles in a graph.", 80)); - System.out.println(); - System.out.println(WordUtils.wrap("This algorithm returns tuples containing the vertex IDs for each triangle and" + - " for directed graphs a bitmask indicating the presence of the six potential connecting edges.", 80)); - System.out.println(); - System.out.println("usage: TriangleListing --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]>"); - System.out.println(); - System.out.println("options:"); - System.out.println(" --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]"); - System.out.println(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]"); - System.out.println(); - System.out.println(" --output print"); - System.out.println(" --output hash"); - System.out.println(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]"); - } - - public static void main(String[] args) throws Exception { - // Set up the execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); - - ParameterTool parameters = ParameterTool.fromArgs(args); - if (! parameters.has("directed")) { - printUsage(); - return; - } - boolean directedAlgorithm = parameters.getBoolean("directed"); - - DataSet tl; - - switch (parameters.get("input", "")) { - case "csv": { - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - GraphCsvReader reader = Graph - .fromCsvReader(parameters.get("input_filename"), env) - .ignoreCommentsEdges("#") - .lineDelimiterEdges(lineDelimiter) - .fieldDelimiterEdges(fieldDelimiter); - - switch (parameters.get("type", "")) { - case "integer": { - Graph<LongValue, NullValue, NullValue> graph = reader - .keyType(LongValue.class); - - if (directedAlgorithm) { - tl = graph - .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>()); - } else { - tl = graph - .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>()); - } - } break; - - case "string": { - Graph<StringValue, NullValue, NullValue> graph = reader - .keyType(StringValue.class); - - if (directedAlgorithm) { - tl = graph - .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<StringValue, NullValue, NullValue>()); - } else { - tl = graph - .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<StringValue, NullValue, NullValue>()); - } - } break; - - default: - printUsage(); - return; - } - - - } break; - - case "rmat": { - int scale = parameters.getInt("scale", DEFAULT_SCALE); - int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); - - RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory(); - - long vertexCount = 1L << scale; - long edgeCount = vertexCount * edgeFactor; - - Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) - .generate(); - - if (directedAlgorithm) { - if (scale > 32) { - tl = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()) - .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>()); - } else { - tl = graph - .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())) - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>()) - .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<IntValue, NullValue, NullValue>()); - } - } else { - boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); - - graph = graph - .run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip)); - - if (scale > 32) { - tl = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip)) - .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>()); - } else { - tl = graph - .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())) - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip)) - .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, NullValue, NullValue>()); - } - } - } break; - - default: - printUsage(); - return; - } - - switch (parameters.get("output", "")) { - case "print": - if (directedAlgorithm) { - for (Object e: tl.collect()) { - org.apache.flink.graph.library.clustering.directed.TriangleListing.Result result = - (org.apache.flink.graph.library.clustering.directed.TriangleListing.Result) e; - System.out.println(result.toVerboseString()); - } - } else { - tl.print(); - } - break; - - case "hash": - System.out.println(DataSetUtils.checksumHashCode(tl)); - break; - - case "csv": - String filename = parameters.get("output_filename"); - - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - tl.writeAsCsv(filename, lineDelimiter, fieldDelimiter); - - env.execute(); - break; - default: - printUsage(); - return; - } - - JobExecutionResult result = env.getLastJobExecutionResult(); - - NumberFormat nf = NumberFormat.getInstance(); - System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java deleted file mode 100644 index b1bc831..0000000 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.examples.utils; - -import java.io.PrintStream; -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.io.OutputFormat; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Vertex; -import org.apache.flink.types.NullValue; -import org.apache.flink.util.Collector; - -public class ExampleUtils { - - @SuppressWarnings({ "serial", "unchecked", "rawtypes" }) - public static void printResult(DataSet set, String msg) { - set.output(new PrintingOutputFormatWithMessage(msg) { - }); - } - - public static class PrintingOutputFormatWithMessage<T> implements - OutputFormat<T> { - - private static final long serialVersionUID = 1L; - - private transient PrintStream stream; - - private transient String prefix; - - private String message; - - // -------------------------------------------------------------------------------------------- - - /** - * Instantiates a printing output format that prints to standard out. - */ - public PrintingOutputFormatWithMessage() { - } - - public PrintingOutputFormatWithMessage(String msg) { - this.message = msg; - } - - @Override - public void open(int taskNumber, int numTasks) { - // get the target stream - this.stream = System.out; - - // set the prefix to message - this.prefix = message + ": "; - } - - @Override - public void writeRecord(T record) { - if (this.prefix != null) { - this.stream.println(this.prefix + record.toString()); - } else { - this.stream.println(record.toString()); - } - } - - @Override - public void close() { - this.stream = null; - this.prefix = null; - } - - @Override - public String toString() { - return "Print to System.out"; - } - - @Override - public void configure(Configuration parameters) { - } - } - - @SuppressWarnings("serial") - public static DataSet<Vertex<Long, NullValue>> getVertexIds( - ExecutionEnvironment env, final long numVertices) { - return env.generateSequence(1, numVertices).map( - new MapFunction<Long, Vertex<Long, NullValue>>() { - public Vertex<Long, NullValue> map(Long l) { - return new Vertex<Long, NullValue>(l, NullValue - .getInstance()); - } - }); - } - - @SuppressWarnings("serial") - public static DataSet<Edge<Long, NullValue>> getRandomEdges( - ExecutionEnvironment env, final long numVertices) { - return env.generateSequence(1, numVertices).flatMap( - new FlatMapFunction<Long, Edge<Long, NullValue>>() { - @Override - public void flatMap(Long key, Collector<Edge<Long, NullValue>> out) throws Exception { - int numOutEdges = (int) (Math.random() * (numVertices / 2)); - for (int i = 0; i < numOutEdges; i++) { - long target = (long) (Math.random() * numVertices) + 1; - out.collect(new Edge<Long, NullValue>(key, target, - NullValue.getInstance())); - } - } - }); - } - - public static DataSet<Vertex<Long, Double>> getLongDoubleVertexData( - ExecutionEnvironment env) { - List<Vertex<Long, Double>> vertices = new ArrayList<Vertex<Long, Double>>(); - vertices.add(new Vertex<Long, Double>(1L, 1.0)); - vertices.add(new Vertex<Long, Double>(2L, 2.0)); - vertices.add(new Vertex<Long, Double>(3L, 3.0)); - vertices.add(new Vertex<Long, Double>(4L, 4.0)); - vertices.add(new Vertex<Long, Double>(5L, 5.0)); - - return env.fromCollection(vertices); - } - - public static DataSet<Edge<Long, Double>> getLongDoubleEdgeData( - ExecutionEnvironment env) { - List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>(); - edges.add(new Edge<Long, Double>(1L, 2L, 12.0)); - edges.add(new Edge<Long, Double>(1L, 3L, 13.0)); - edges.add(new Edge<Long, Double>(2L, 3L, 23.0)); - edges.add(new Edge<Long, Double>(3L, 4L, 34.0)); - edges.add(new Edge<Long, Double>(3L, 5L, 35.0)); - edges.add(new Edge<Long, Double>(4L, 5L, 45.0)); - edges.add(new Edge<Long, Double>(5L, 1L, 51.0)); - - return env.fromCollection(edges); - } - - /** - * Private constructor to prevent instantiation. - */ - private ExampleUtils() { - throw new RuntimeException(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala deleted file mode 100644 index ebf43d4..0000000 --- a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.scala.examples - -import org.apache.flink.api.scala._ -import org.apache.flink.graph.scala._ -import org.apache.flink.types.NullValue -import org.apache.flink.graph.Edge -import org.apache.flink.util.Collector - -/** - * This example illustrates how to use Gelly metrics methods and get simple statistics - * from the input graph. - * - * The program creates a random graph and computes and prints - * the following metrics: - * - number of vertices - * - number of edges - * - average node degree - * - the vertex ids with the max/min in- and out-degrees - * - * The input file is expected to contain one edge per line, - * with long IDs and no values, in the following format: - * {{{ - * <sourceVertexID>\t<targetVertexID> - * }}} - * If no arguments are provided, the example runs with a random graph of 100 vertices. - * - */ -object GraphMetrics { - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - val env = ExecutionEnvironment.getExecutionEnvironment - /** create the graph **/ - val graph: Graph[Long, NullValue, NullValue] = Graph.fromDataSet(getEdgeDataSet(env), env) - - /** get the number of vertices **/ - val numVertices = graph.numberOfVertices - - /** get the number of edges **/ - val numEdges = graph.numberOfEdges - - /** compute the average node degree **/ - val verticesWithDegrees = graph.getDegrees - val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2.getValue / numVertices).toDouble) - - /** find the vertex with the maximum in-degree **/ - val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1) - - /** find the vertex with the minimum in-degree **/ - val minInDegreeVertex = graph.inDegrees.min(1).map(in => in._1) - - /** find the vertex with the maximum out-degree **/ - val maxOutDegreeVertex = graph.outDegrees.max(1).map(in => in._1) - - /** find the vertex with the minimum out-degree **/ - val minOutDegreeVertex = graph.outDegrees.min(1).map(in => in._1) - - /** print the results **/ - env.fromElements(numVertices).printOnTaskManager("Total number of vertices") - env.fromElements(numEdges).printOnTaskManager("Total number of edges") - avgDegree.printOnTaskManager("Average node degree") - maxInDegreeVertex.printOnTaskManager("Vertex with Max in-degree") - minInDegreeVertex.printOnTaskManager("Vertex with Max in-degree") - maxOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree") - minOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree") - - } - - private def parseParameters(args: Array[String]): Boolean = { - if (args.length > 0) { - fileOutput = true - if (args.length == 1) { - edgesPath = args(0) - true - } else { - System.err.println("Usage: GraphMetrics <edges path>") - false - } - } else { - System.out.println("Executing GraphMetrics example with built-in default data.") - System.out.println(" Provide parameters to read input data from a file.") - System.out.println(" Usage: GraphMetrics <edges path>") - true - } - } - - private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = { - if (fileOutput) { - env.readCsvFile[(Long, Long)]( - edgesPath, - fieldDelimiter = "\t").map( - in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance())) - } else { - env.generateSequence(1, numVertices).flatMap[Edge[Long, NullValue]]( - (key: Long, out: Collector[Edge[Long, NullValue]]) => { - val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt - for ( i <- 0 to numOutEdges ) { - val target: Long = ((Math.random() * numVertices) + 1).toLong - new Edge[Long, NullValue](key, target, NullValue.getInstance()) - } - }) - } - } - - private var fileOutput: Boolean = false - private var edgesPath: String = null - private var outputPath: String = null - private val numVertices = 100 -} http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java index 167e31c..b3e1e30 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java @@ -367,6 +367,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { } /** + * Get the average degree. + * + * @return average degree + */ + public float getAverageDegree() { + return edgeCount / (float)vertexCount; + } + + /** * Get the number of triangle triplets. * * @return number of triangle triplets @@ -453,6 +462,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { return "vertex count: " + nf.format(vertexCount) + "; edge count: " + nf.format(edgeCount) + + "; average degree: " + nf.format(getAverageDegree()) + "; triangle triplet count: " + nf.format(triangleTripletCount) + "; rectangle triplet count: " + nf.format(rectangleTripletCount) + "; triplet count: " + nf.format(tripletCount) http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java index 22f7733..909eea5 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java @@ -228,6 +228,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { } /** + * Get the average degree. + * + * @return average degree + */ + public float getAverageDegree() { + return edgeCount / (float)vertexCount; + } + + /** * Get the number of triplets. * * @return number of triplets @@ -278,6 +287,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { return "vertex count: " + nf.format(vertexCount) + "; edge count: " + nf.format(edgeCount) + + "; average degree: " + nf.format(getAverageDegree()) + "; triplet count: " + nf.format(tripletCount) + "; maximum degree: " + nf.format(maximumDegree) + "; maximum out degree: " + nf.format(maximumOutDegree) http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java index 1d5b664..6bce42c 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java @@ -329,6 +329,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { } /** + * Get the average degree. + * + * @return average degree + */ + public float getAverageDegree() { + return edgeCount / (float)vertexCount; + } + + /** * Get the number of triangle triplets. * * @return number of triangle triplets @@ -397,6 +406,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { return "vertex count: " + nf.format(vertexCount) + "; edge count: " + nf.format(edgeCount) + + "; average degree: " + nf.format(getAverageDegree()) + "; triangle triplet count: " + nf.format(triangleTripletCount) + "; rectangle triplet count: " + nf.format(rectangleTripletCount) + "; triplet count: " + nf.format(tripletCount) http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java index d04fa7b..8012605 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java @@ -230,6 +230,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { } /** + * Get the average degree. + * + * @return average degree + */ + public float getAverageDegree() { + return edgeCount / (float)vertexCount; + } + + /** * Get the number of triplets. * * @return number of triplets @@ -262,6 +271,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { return "vertex count: " + nf.format(vertexCount) + "; edge count: " + nf.format(edgeCount) + + "; average degree: " + nf.format(getAverageDegree()) + "; triplet count: " + nf.format(tripletCount) + "; maximum degree: " + nf.format(maximumDegree) + "; maximum triplets: " + nf.format(maximumTriplets);
