http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java index 93a96c4..ca0c167 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java @@ -18,315 +18,125 @@ package org.apache.flink.graph.drivers; -import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.lang3.text.StrBuilder; import org.apache.commons.lang3.text.WordUtils; -import org.apache.commons.math3.random.JDKRandomGenerator; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.CsvOutputFormat; -import org.apache.flink.api.java.utils.DataSetUtils; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.client.program.ProgramParametrizationException; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAnalytic; -import org.apache.flink.graph.GraphCsvReader; -import org.apache.flink.graph.asm.translate.TranslateGraphIds; -import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue; -import org.apache.flink.graph.generator.RMatGraph; -import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; -import org.apache.flink.graph.generator.random.RandomGenerableFactory; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.NullValue; -import org.apache.flink.types.StringValue; - -import java.text.NumberFormat; +import org.apache.flink.graph.asm.result.PrintableResult; +import org.apache.flink.graph.drivers.output.CSV; +import org.apache.flink.graph.drivers.output.Hash; +import org.apache.flink.graph.drivers.output.Print; +import org.apache.flink.graph.drivers.parameter.BooleanParameter; +import org.apache.flink.graph.drivers.parameter.ChoiceParameter; +import org.apache.flink.graph.drivers.parameter.LongParameter; +import org.apache.flink.types.CopyableValue; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; /** - * Driver for the library implementation of Triangle Listing. - * - * This example reads a simple directed or undirected graph from a CSV file or - * generates an RMat graph with the given scale and edge factor then lists - * all triangles. + * Driver for directed and undirected triangle listing algorithm and analytic. * * @see org.apache.flink.graph.library.clustering.directed.TriangleListing + * @see org.apache.flink.graph.library.clustering.directed.TriadicCensus * @see org.apache.flink.graph.library.clustering.undirected.TriangleListing + * @see org.apache.flink.graph.library.clustering.undirected.TriadicCensus */ -public class TriangleListing { - - private static final int DEFAULT_SCALE = 10; - - private static final int DEFAULT_EDGE_FACTOR = 16; +public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV> +extends SimpleDriver<PrintableResult> +implements Driver<K, VV, EV>, CSV, Hash, Print { - private static final boolean DEFAULT_TRIADIC_CENSUS = true; - - private static final boolean DEFAULT_CLIP_AND_FLIP = true; - - private static String getUsage(String message) { - return new StrBuilder() - .appendNewLine() - .appendln(WordUtils.wrap("Lists all triangles in a graph.", 80)) - .appendNewLine() - .appendln(WordUtils.wrap("This algorithm returns tuples containing the vertex IDs for each triangle and" + - " for directed graphs a bitmask indicating the presence of the six potential connecting edges.", 80)) - .appendNewLine() - .appendln("usage: TriangleListing --directed <true | false> [--triadic_census <true | false>] --input <csv | rmat> --output <print | hash | csv>") - .appendNewLine() - .appendln("options:") - .appendln(" --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]") - .appendln(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]") - .appendNewLine() - .appendln(" --output print") - .appendln(" --output hash") - .appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]") - .appendNewLine() - .appendln("Usage error: " + message) - .toString(); - } + private static final String DIRECTED = "directed"; - public static void main(String[] args) throws Exception { - // Set up the execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); + private static final String UNDIRECTED = "undirected"; - ParameterTool parameters = ParameterTool.fromArgs(args); - env.getConfig().setGlobalJobParameters(parameters); + private ChoiceParameter order = new ChoiceParameter(this, "order") + .addChoices(DIRECTED, UNDIRECTED); - if (! parameters.has("directed")) { - throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'")); - } - boolean directedAlgorithm = parameters.getBoolean("directed"); + private BooleanParameter sortTriangleVertices = new BooleanParameter(this, "sort_triangle_vertices"); - int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT); - boolean triadic_census = parameters.getBoolean("triadic_census", DEFAULT_TRIADIC_CENSUS); + private BooleanParameter computeTriadicCensus = new BooleanParameter(this, "triadic_census"); - GraphAnalytic tc = null; - DataSet tl; - - switch (parameters.get("input", "")) { - case "csv": { - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - GraphCsvReader reader = Graph - .fromCsvReader(parameters.getRequired("input_filename"), env) - .ignoreCommentsEdges("#") - .lineDelimiterEdges(lineDelimiter) - .fieldDelimiterEdges(fieldDelimiter); - - switch (parameters.get("type", "")) { - case "integer": { - Graph<LongValue, NullValue, NullValue> graph = reader - .keyType(LongValue.class); - - if (directedAlgorithm) { - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>() - .setParallelism(little_parallelism)); - } - - if (triadic_census) { - tc = graph - .run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } - tl = graph - .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } else { - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false) - .setParallelism(little_parallelism)); - } - - if (triadic_census) { - tc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } - tl = graph - .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } - } break; - - case "string": { - Graph<StringValue, NullValue, NullValue> graph = reader - .keyType(StringValue.class); - - if (directedAlgorithm) { - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>() - .setParallelism(little_parallelism)); - } - - if (triadic_census) { - tc = graph - .run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<StringValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } - tl = graph - .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<StringValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } else { - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false) - .setParallelism(little_parallelism)); - } - - if (triadic_census) { - tc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<StringValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } - tl = graph - .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<StringValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } - } break; - - default: - throw new ProgramParametrizationException(getUsage("invalid CSV type")); - } + private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") + .setDefaultValue(PARALLELISM_DEFAULT); + private GraphAnalytic<K, VV, EV, ? extends PrintableResult> triadicCensus; - } break; - - case "rmat": { - int scale = parameters.getInt("scale", DEFAULT_SCALE); - int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); - - RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory(); - - long vertexCount = 1L << scale; - long edgeCount = vertexCount * edgeFactor; - - Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) - .generate(); - - if (directedAlgorithm) { - if (scale > 32) { - Graph<LongValue, NullValue, NullValue> simpleGraph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>() - .setParallelism(little_parallelism)); - - if (triadic_census) { - tc = simpleGraph - .run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } - tl = simpleGraph - .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } else { - Graph<LongValue, NullValue, NullValue> simpleGraph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>() - .setParallelism(little_parallelism)); - - if (triadic_census) { - tc = simpleGraph - .run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } - tl = simpleGraph - .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } - } else { - boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); + @Override + public String getName() { + return this.getClass().getSimpleName(); + } - if (scale > 32) { - Graph<LongValue, NullValue, NullValue> simpleGraph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip) - .setParallelism(little_parallelism)); + @Override + public String getShortDescription() { + return "list triangles"; + } - if (triadic_census) { - tc = simpleGraph - .run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } - tl = simpleGraph - .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } else { - Graph<IntValue, NullValue, NullValue> simpleGraph = graph - .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()) - .setParallelism(little_parallelism)) - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip) - .setParallelism(little_parallelism)); + @Override + public String getLongDescription() { + return WordUtils.wrap(new StrBuilder() + .appendln("List all triangles graph.") + .appendNewLine() + .append("The algorithm result contains three vertex IDs. For the directed algorithm " + + "the result contains an additional bitmask indicating the presence of the six " + + "potential connecting edges.") + .toString(), 80); + } - if (triadic_census) { - tc = simpleGraph - .run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<IntValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } - tl = simpleGraph - .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } + @Override + public void plan(Graph<K, VV, EV> graph) throws Exception { + int lp = littleParallelism.getValue().intValue(); + + switch (order.getValue()) { + case DIRECTED: + result = graph + .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<K, VV, EV>() + .setSortTriangleVertices(sortTriangleVertices.getValue()) + .setLittleParallelism(lp)); + + if (computeTriadicCensus.getValue()) { + triadicCensus = graph + .run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<K, VV, EV>() + .setLittleParallelism(lp)); } - } break; + break; - default: - throw new ProgramParametrizationException(getUsage("invalid input type")); - } + case UNDIRECTED: + result = graph + .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<K, VV, EV>() + .setSortTriangleVertices(sortTriangleVertices.getValue()) + .setLittleParallelism(lp)); - switch (parameters.get("output", "")) { - case "print": - System.out.println(); - if (directedAlgorithm) { - for (Object e: tl.collect()) { - org.apache.flink.graph.library.clustering.directed.TriangleListing.Result result = - (org.apache.flink.graph.library.clustering.directed.TriangleListing.Result) e; - System.out.println(result.toPrintableString()); - } - } else { - tl.print(); + if (computeTriadicCensus.getValue()) { + triadicCensus = graph + .run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<K, VV, EV>() + .setLittleParallelism(lp)); } break; + } + } - case "hash": - System.out.println(); - System.out.println(DataSetUtils.checksumHashCode(tl)); - break; - - case "csv": - String filename = parameters.getRequired("output_filename"); - - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); + @Override + public void hash(String executionName) throws Exception { + super.hash(executionName); + printAnalytics(); + } - tl.writeAsCsv(filename, lineDelimiter, fieldDelimiter); + @Override + public void print(String executionName) throws Exception { + super.print(executionName); + printAnalytics(); + } - env.execute(); - break; - default: - throw new ProgramParametrizationException(getUsage("invalid output type")); - } + @Override + public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) { + super.writeCSV(filename, lineDelimiter, fieldDelimiter); + printAnalytics(); + } - if (tc != null) { + private void printAnalytics() { + if (computeTriadicCensus.getValue()) { System.out.print("Triadic census:\n "); - System.out.println(tc.getResult().toString().replace(";", "\n ")); + System.out.println(triadicCensus.getResult().toPrintableString().replace(";", "\n ")); } - - JobExecutionResult result = env.getLastJobExecutionResult(); - - NumberFormat nf = NumberFormat.getInstance(); - System.out.println(); - System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); } }
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java new file mode 100644 index 0000000..e9d648a --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers.parameter; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.graph.drivers.parameter.IterationConvergence.Value; + +/** + * Iterative algorithms which converge can be terminated with a maximum number + * of iterations or a convergence threshold which stops computation when the + * total change in scores is below a given delta. + * + * If the command-line configuration specifies neither a number of iterations + * nor a convergence threshold then a default number of iterations is used + * with an infinite convergence threshold. Otherwise, when either value is + * configured then an unset value is set to infinity. + */ +public class IterationConvergence +implements Parameter<Value> { + + private final int defaultIterations; + + private final Value value = new Value(); + + /** + * Add this parameter to the list of parameters stored by owner. + * + * @param owner the {@link Parameterized} using this {@link Parameter} + * @param defaultIterations the default number of iterations if neither + * the number of iterations nor the convergence + * threshold are specified + */ + public IterationConvergence(ParameterizedBase owner, int defaultIterations) { + owner.addParameter(this); + this.defaultIterations = defaultIterations; + } + + @Override + public String getUsage() { + return "[--iterations ITERATIONS] [--convergence_threshold CONVERGENCE_THRESHOLD]"; + } + + @Override + public void configure(ParameterTool parameterTool) { + if (!parameterTool.has("iterations") && !parameterTool.has("convergence_threshold")) { + // no configuration so use default iterations and maximum threshold + value.iterations = defaultIterations; + value.convergenceThreshold = Double.MAX_VALUE; + } else { + // use configured values and maximum default for unset values + value.iterations = parameterTool.getInt("iterations", Integer.MAX_VALUE); + Util.checkParameter(value.iterations > 0, + "iterations must be greater than zero"); + + value.convergenceThreshold = parameterTool.getDouble("convergence_threshold", Double.MAX_VALUE); + Util.checkParameter(value.convergenceThreshold > 0, + "convergence threshold must be greater than zero"); + } + } + + @Override + public Value getValue() { + return value; + } + + /** + * Encapsulate the number of iterations and the convergence threshold. + */ + public static class Value { + public int iterations; + public double convergenceThreshold; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java deleted file mode 100644 index 6651739..0000000 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.examples; - -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData; -import org.apache.flink.graph.library.GSAConnectedComponents; -import org.apache.flink.types.NullValue; - -/** - * This example shows how to use Gelly's library methods. - * You can find all available library methods in {@link org.apache.flink.graph.library}. - * - * In particular, this example uses the {@link GSAConnectedComponents} - * library method to compute the connected components of the input graph. - * - * The input file is a plain text file and must be formatted as follows: - * Edges are represented by tuples of srcVertexId, trgVertexId which are - * separated by tabs. Edges themselves are separated by newlines. - * For example: <code>1\t2\n1\t3\n</code> defines two edges, - * 1-2 with and 1-3. - * - * Usage <code>ConnectedComponents <edge path> <result path> - * <number of iterations> </code><br> - * If no parameters are provided, the program is run with default data from - * {@link ConnectedComponentsDefaultData} - */ -public class ConnectedComponents implements ProgramDescription { - - @SuppressWarnings("serial") - public static void main(String [] args) throws Exception { - - if(!parseParameters(args)) { - return; - } - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env); - - Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new MapFunction<Long, Long>() { - @Override - public Long map(Long value) throws Exception { - return value; - } - }, env); - - DataSet<Vertex<Long, Long>> verticesWithMinIds = graph - .run(new GSAConnectedComponents<Long, Long, NullValue>(maxIterations)); - - // emit result - if (fileOutput) { - verticesWithMinIds.writeAsCsv(outputPath, "\n", ","); - - // since file sinks are lazy, we trigger the execution explicitly - env.execute("Connected Components Example"); - } else { - verticesWithMinIds.print(); - } - } - - @Override - public String getDescription() { - return "Connected Components Example"; - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String edgeInputPath = null; - private static String outputPath = null; - private static Integer maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS; - - private static boolean parseParameters(String [] args) { - if(args.length > 0) { - if(args.length != 3) { - System.err.println("Usage ConnectedComponents <edge path> <output path> " + - "<num iterations>"); - return false; - } - - fileOutput = true; - edgeInputPath = args[0]; - outputPath = args[1]; - maxIterations = Integer.parseInt(args[2]); - - } else { - System.out.println("Executing ConnectedComponents example with default parameters and built-in default data."); - System.out.println("Provide parameters to read input data from files."); - System.out.println("Usage ConnectedComponents <edge path> <output path> " + - "<num iterations>"); - } - - return true; - } - - @SuppressWarnings("serial") - private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) { - - if(fileOutput) { - return env.readCsvFile(edgeInputPath) - .ignoreComments("#") - .fieldDelimiter("\t") - .lineDelimiter("\n") - .types(Long.class, Long.class) - .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() { - @Override - public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception { - return new Edge<>(value.f0, value.f1, NullValue.getInstance()); - } - }); - } else { - return ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java index 35f07b0..1cd3549 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java @@ -114,7 +114,7 @@ public class GSASingleSourceShortestPaths implements ProgramDescription { public Double gather(Neighbor<Double, Double> neighbor) { return neighbor.getNeighborValue() + neighbor.getEdgeValue(); } - }; + } @SuppressWarnings("serial") private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> { @@ -122,7 +122,7 @@ public class GSASingleSourceShortestPaths implements ProgramDescription { public Double sum(Double newValue, Double currentValue) { return Math.min(newValue, currentValue); } - }; + } @SuppressWarnings("serial") private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> { http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/IterationConvergenceTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/IterationConvergenceTest.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/IterationConvergenceTest.java new file mode 100644 index 0000000..ae92943 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/IterationConvergenceTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers.parameter; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class IterationConvergenceTest +extends ParameterTestBase { + + private IterationConvergence parameter; + + @Before + public void setup() { + super.setup(); + + parameter = new IterationConvergence(owner, 10); + } + + @Test + public void testWithIterations() { + parameter.configure(ParameterTool.fromArgs(new String[]{"--iterations", "42"})); + Assert.assertEquals(42, parameter.getValue().iterations); + Assert.assertEquals(Double.MAX_VALUE, parameter.getValue().convergenceThreshold, 0.000001); + } + + @Test + public void testWithConvergenceThreshold() { + parameter.configure(ParameterTool.fromArgs(new String[]{"--convergence_threshold", "42"})); + Assert.assertEquals(Integer.MAX_VALUE, parameter.getValue().iterations); + Assert.assertEquals(42.0, parameter.getValue().convergenceThreshold, 0.000001); + } + + @Test + public void testWithBoth() { + parameter.configure(ParameterTool.fromArgs(new String[]{"--iterations", "42", "--convergence_threshold", "42"})); + Assert.assertEquals(42, parameter.getValue().iterations); + Assert.assertEquals(42.0, parameter.getValue().convergenceThreshold, 0.000001); + } + + @Test + public void testWithNeither() { + parameter.configure(ParameterTool.fromArgs(new String[]{})); + Assert.assertEquals(10, parameter.getValue().iterations); + Assert.assertEquals(Double.MAX_VALUE, parameter.getValue().convergenceThreshold, 0.000001); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java deleted file mode 100644 index d0de8dc..0000000 --- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test.examples; - -import com.google.common.base.Charsets; -import com.google.common.io.Files; -import org.apache.flink.graph.examples.ConnectedComponents; -import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.test.util.TestBaseUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.File; - -@RunWith(Parameterized.class) -public class ConnectedComponentsITCase extends MultipleProgramsTestBase { - - private String edgesPath; - - private String resultPath; - - private String expected; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - public ConnectedComponentsITCase(TestExecutionMode mode) { - super(mode); - } - - @Before - public void before() throws Exception { - resultPath = tempFolder.newFile().toURI().toString(); - - File edgesFile = tempFolder.newFile(); - Files.write(ConnectedComponentsDefaultData.EDGES, edgesFile, Charsets.UTF_8); - edgesPath = edgesFile.toURI().toString(); - } - - @Test - public void testConnectedComponentsExample() throws Exception { - ConnectedComponents.main(new String[]{edgesPath, resultPath, ConnectedComponentsDefaultData.MAX_ITERATIONS + ""}); - expected = ConnectedComponentsDefaultData.VERTICES_WITH_MIN_ID; - } - - @After - public void after() throws Exception { - TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index cbbfb02..71baaa9 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -529,7 +529,13 @@ public class Graph<K, VV, EV> { TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0); - TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, vertices.getType(), null); + TypeInformation<NV> valueType; + + if (mapper instanceof ResultTypeQueryable) { + valueType = ((ResultTypeQueryable) mapper).getProducedType(); + } else { + valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, vertices.getType(), null); + } TypeInformation<Vertex<K, NV>> returnType = (TypeInformation<Vertex<K, NV>>) new TupleTypeInfo( Vertex.class, keyType, valueType); @@ -573,7 +579,13 @@ public class Graph<K, VV, EV> { TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0); - TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, edges.getType(), null); + TypeInformation<NV> valueType; + + if (mapper instanceof ResultTypeQueryable) { + valueType = ((ResultTypeQueryable) mapper).getProducedType(); + } else { + valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, edges.getType(), null); + } TypeInformation<Edge<K, NV>> returnType = (TypeInformation<Edge<K, NV>>) new TupleTypeInfo( Edge.class, keyType, keyType, valueType); http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java index 3cd8f05..959b816 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java @@ -22,13 +22,14 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.spargel.GatherFunction; import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.ScatterFunction; -import org.apache.flink.graph.utils.NullValueEdgeMapper; +import org.apache.flink.graph.utils.GraphUtils.MapTo; import org.apache.flink.types.NullValue; /** @@ -72,7 +73,7 @@ public class ConnectedComponents<K, VV extends Comparable<VV>, EV> TypeInformation<VV> valueTypeInfo = ((TupleTypeInfo<?>) graph.getVertices().getType()).getTypeAt(1); Graph<K, VV, NullValue> undirectedGraph = graph - .mapEdges(new NullValueEdgeMapper<K, EV>()) + .mapEdges(new MapTo<Edge<K, EV>, NullValue>(NullValue.getInstance())) .getUndirected(); return undirectedGraph.runScatterGatherIteration( http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java index 327de73..1680f38 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; @@ -29,7 +30,7 @@ import org.apache.flink.graph.gsa.ApplyFunction; import org.apache.flink.graph.gsa.GatherFunction; import org.apache.flink.graph.gsa.Neighbor; import org.apache.flink.graph.gsa.SumFunction; -import org.apache.flink.graph.utils.NullValueEdgeMapper; +import org.apache.flink.graph.utils.GraphUtils.MapTo; import org.apache.flink.types.NullValue; /** @@ -73,7 +74,7 @@ public class GSAConnectedComponents<K, VV extends Comparable<VV>, EV> TypeInformation<VV> valueTypeInfo = ((TupleTypeInfo<?>) graph.getVertices().getType()).getTypeAt(1); Graph<K, VV, NullValue> undirectedGraph = graph - .mapEdges(new NullValueEdgeMapper<K, EV>()) + .mapEdges(new MapTo<Edge<K, EV>, NullValue>(NullValue.getInstance())) .getUndirected(); return undirectedGraph.runGatherSumApplyIteration( @@ -87,7 +88,6 @@ public class GSAConnectedComponents<K, VV extends Comparable<VV>, EV> // Connected Components UDFs // -------------------------------------------------------------------------------------------- - @SuppressWarnings("serial") private static final class GatherNeighborIds<VV extends Comparable<VV>> extends GatherFunction<VV, NullValue, VV> implements ResultTypeQueryable<VV> { @@ -108,7 +108,6 @@ public class GSAConnectedComponents<K, VV extends Comparable<VV>, EV> } } - @SuppressWarnings("serial") private static final class SelectMinId<VV extends Comparable<VV>> extends SumFunction<VV, NullValue, VV> implements ResultTypeQueryable<VV> { @@ -129,7 +128,6 @@ public class GSAConnectedComponents<K, VV extends Comparable<VV>, EV> } } - @SuppressWarnings("serial") private static final class UpdateComponentId<K, VV extends Comparable<VV>> extends ApplyFunction<K, VV, VV> implements ResultTypeQueryable<VV> { http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java index 96e5afc..0064a68 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java @@ -22,13 +22,14 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.spargel.GatherFunction; import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.ScatterFunction; -import org.apache.flink.graph.utils.NullValueEdgeMapper; +import org.apache.flink.graph.utils.GraphUtils.MapTo; import org.apache.flink.types.NullValue; import java.util.HashMap; @@ -76,7 +77,7 @@ public class LabelPropagation<K, VV extends Comparable<VV>, EV> TypeInformation<VV> valueType = ((TupleTypeInfo<?>) input.getVertices().getType()).getTypeAt(1); // iteratively adopt the most frequent label among the neighbors of each vertex return input - .mapEdges(new NullValueEdgeMapper<K, EV>()) + .mapEdges(new MapTo<Edge<K, EV>, NullValue>(NullValue.getInstance())) .runScatterGatherIteration( new SendNewLabelToNeighbors<K, VV>(valueType), new UpdateVertexLabel<K, VV>(), maxIterations) .getVertices(); http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java index 236272f..6fe753a 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java @@ -35,9 +35,9 @@ import org.apache.flink.graph.EdgeOrder; import org.apache.flink.graph.Graph; import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; -import org.apache.flink.graph.library.clustering.directed.TriangleListing.Result; import org.apache.flink.graph.asm.result.PrintableResult; import org.apache.flink.graph.asm.result.TertiaryResult; +import org.apache.flink.graph.library.clustering.directed.TriangleListing.Result; import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.ByteValue; http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java index 9aca8a4..eda5c1c 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java @@ -32,8 +32,8 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree; import org.apache.flink.graph.asm.result.PrintableResult; -import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result; import org.apache.flink.graph.asm.result.UnaryResult; +import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result; import org.apache.flink.graph.utils.Murmur3_32; import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.graph.utils.proxy.OptionalBoolean; http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java index 57743e8..1dfa3ee 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java @@ -40,6 +40,8 @@ import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.directed.EdgeSourceDegrees; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; +import org.apache.flink.graph.asm.result.PrintableResult; +import org.apache.flink.graph.asm.result.UnaryResult; import org.apache.flink.graph.library.link_analysis.Functions.SumScore; import org.apache.flink.graph.library.link_analysis.PageRank.Result; import org.apache.flink.graph.utils.GraphUtils; @@ -500,7 +502,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { * @param <T> ID type */ public static class Result<T> - extends Tuple2<T, DoubleValue> { + extends Tuple2<T, DoubleValue> + implements PrintableResult, UnaryResult<T> { public static final int HASH_SEED = 0x4010af29; private Murmur3_32 hasher = new Murmur3_32(HASH_SEED); @@ -518,7 +521,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { return f1; } - public String toVerboseString() { + @Override + public String toPrintableString() { return "Vertex ID: " + getVertexId0() + ", PageRank score: " + getPageRankScore(); } http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java index 7d77541..6aaf9f2 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java @@ -34,8 +34,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree; -import org.apache.flink.graph.asm.result.PrintableResult; import org.apache.flink.graph.asm.result.BinaryResult; +import org.apache.flink.graph.asm.result.PrintableResult; import org.apache.flink.graph.library.similarity.AdamicAdar.Result; import org.apache.flink.graph.utils.Murmur3_32; import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java index 3b36715..0c80e6d 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java @@ -29,8 +29,8 @@ import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree; -import org.apache.flink.graph.asm.result.PrintableResult; import org.apache.flink.graph.asm.result.BinaryResult; +import org.apache.flink.graph.asm.result.PrintableResult; import org.apache.flink.graph.library.similarity.JaccardIndex.Result; import org.apache.flink.graph.utils.Murmur3_32; import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java index 2e0dffc..78fb378 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java @@ -20,7 +20,10 @@ package org.apache.flink.graph.utils; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.types.LongValue; import static org.apache.flink.api.java.typeutils.ValueTypeInfo.LONG_VALUE_TYPE_INFO; @@ -62,7 +65,7 @@ public class GraphUtils { * @param <O> output type */ public static class MapTo<I, O> - implements MapFunction<I, O> { + implements MapFunction<I, O>, ResultTypeQueryable<O> { private final O value; /** @@ -78,6 +81,11 @@ public class GraphUtils { public O map(I o) throws Exception { return value; } + + @Override + public TypeInformation<O> getProducedType() { + return (TypeInformation<O>)TypeExtractor.createTypeInfo(value.getClass()); + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java deleted file mode 100644 index 2bd4719..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.utils; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.graph.Edge; -import org.apache.flink.types.NullValue; - -public class NullValueEdgeMapper<K, EV> implements MapFunction<Edge<K, EV>, NullValue> { - - private static final long serialVersionUID = 1L; - - public NullValue map(Edge<K, EV> edge) { - return NullValue.getInstance(); - } -}
