Repository: flink Updated Branches: refs/heads/master fb1ef0812 -> 59cf7032f
[hotfix] [gelly] Driver updates - refactor SimpleDriver to call internal plan method - add CLI parameters for RMatGraph, AdamicAdar, JaccardIndex - remove unused data from VertexDegrees - JaccardIndex now filters on > rather than >= - handle null in ValueArrayTypeInfo - add NonForwardingIdentityMapper to GraphUtils Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59cf7032 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59cf7032 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59cf7032 Branch: refs/heads/master Commit: 59cf7032f3de35ba461d22411fa45f0286c62981 Parents: fb1ef08 Author: Greg Hogan <[email protected]> Authored: Thu Apr 13 14:07:29 2017 -0400 Committer: Greg Hogan <[email protected]> Committed: Wed Apr 19 11:44:55 2017 -0400 ---------------------------------------------------------------------- .../java/org/apache/flink/graph/Runner.java | 14 +++-- .../apache/flink/graph/drivers/AdamicAdar.java | 20 +++++-- .../graph/drivers/ClusteringCoefficient.java | 30 ++++++---- .../graph/drivers/ConnectedComponents.java | 7 +-- .../apache/flink/graph/drivers/EdgeList.java | 5 +- .../org/apache/flink/graph/drivers/HITS.java | 9 +-- .../flink/graph/drivers/JaccardIndex.java | 27 +++++++-- .../apache/flink/graph/drivers/PageRank.java | 9 +-- .../flink/graph/drivers/SimpleDriver.java | 63 +++++++++++++++++--- .../flink/graph/drivers/TriangleListing.java | 32 ++++++---- .../flink/graph/drivers/input/RMatGraph.java | 3 + .../degree/annotate/directed/VertexDegrees.java | 30 +++++----- .../graph/library/link_analysis/Functions.java | 4 +- .../graph/library/link_analysis/PageRank.java | 5 ++ .../graph/library/similarity/JaccardIndex.java | 7 ++- .../types/valuearray/ValueArrayTypeInfo.java | 13 +++- .../valuearray/ValueArrayTypeInfoFactory.java | 5 +- .../apache/flink/graph/utils/GraphUtils.java | 27 ++++++++- .../library/similarity/JaccardIndexTest.java | 2 + 19 files changed, 224 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java index 4b6cf42..5ffe681 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java @@ -20,6 +20,7 @@ package org.apache.flink.graph; import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.lang3.text.StrBuilder; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.CsvOutputFormat; import org.apache.flink.api.java.utils.ParameterTool; @@ -188,20 +189,21 @@ public class Runner { public static void main(String[] args) throws Exception { // Set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + ExecutionConfig config = env.getConfig(); // should not have any non-Flink data types - env.getConfig().disableAutoTypeRegistration(); - env.getConfig().disableForceAvro(); - env.getConfig().disableForceKryo(); + config.disableAutoTypeRegistration(); + config.disableForceAvro(); + config.disableForceKryo(); ParameterTool parameters = ParameterTool.fromArgs(args); - env.getConfig().setGlobalJobParameters(parameters); + config.setGlobalJobParameters(parameters); // integration tests run with with object reuse both disabled and enabled if (parameters.has("__disable_object_reuse")) { - env.getConfig().disableObjectReuse(); + config.disableObjectReuse(); } else { - env.getConfig().enableObjectReuse(); + config.enableObjectReuse(); } // Usage http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java index 742c1de..8bf9268 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java @@ -20,9 +20,11 @@ package org.apache.flink.graph.drivers; import org.apache.commons.lang3.text.StrBuilder; import org.apache.commons.lang3.text.WordUtils; +import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.drivers.output.CSV; import org.apache.flink.graph.drivers.output.Print; +import org.apache.flink.graph.drivers.parameter.DoubleParameter; import org.apache.flink.graph.drivers.parameter.LongParameter; import org.apache.flink.graph.library.similarity.AdamicAdar.Result; import org.apache.flink.types.CopyableValue; @@ -33,8 +35,16 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * Driver for {@link org.apache.flink.graph.library.similarity.AdamicAdar}. */ public class AdamicAdar<K extends CopyableValue<K>, VV, EV> -extends SimpleDriver<Result<K>> -implements Driver<K, VV, EV>, CSV, Print { +extends SimpleDriver<K, VV, EV, Result<K>> +implements CSV, Print { + + private DoubleParameter minRatio = new DoubleParameter(this, "minimum_ratio") + .setDefaultValue(0.0) + .setMinimumValue(0.0, true); + + private DoubleParameter minScore = new DoubleParameter(this, "minimum_score") + .setDefaultValue(0.0) + .setMinimumValue(0.0, true); private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") .setDefaultValue(PARALLELISM_DEFAULT); @@ -61,11 +71,13 @@ implements Driver<K, VV, EV>, CSV, Print { } @Override - public void plan(Graph<K, VV, EV> graph) throws Exception { + protected DataSet<Result<K>> simplePlan(Graph<K, VV, EV> graph) throws Exception { int lp = littleParallelism.getValue().intValue(); - result = graph + return graph .run(new org.apache.flink.graph.library.similarity.AdamicAdar<K, VV, EV>() + .setMinimumRatio(minRatio.getValue().floatValue()) + .setMinimumScore(minScore.getValue().floatValue()) .setLittleParallelism(lp)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java index c463c0a..4958b5a 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java @@ -20,6 +20,7 @@ package org.apache.flink.graph.drivers; import org.apache.commons.lang3.text.StrBuilder; import org.apache.commons.lang3.text.WordUtils; +import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAnalytic; import org.apache.flink.graph.asm.result.PrintableResult; @@ -43,8 +44,8 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @see org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient */ public class ClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV> -extends SimpleDriver<PrintableResult> -implements Driver<K, VV, EV>, CSV, Hash, Print { +extends SimpleDriver<K, VV, EV, PrintableResult> +implements CSV, Hash, Print { private static final String DIRECTED = "directed"; @@ -85,15 +86,11 @@ implements Driver<K, VV, EV>, CSV, Hash, Print { } @Override - public void plan(Graph<K, VV, EV> graph) throws Exception { + protected DataSet<PrintableResult> simplePlan(Graph<K, VV, EV> graph) throws Exception { int lp = littleParallelism.getValue().intValue(); switch (order.getValue()) { case DIRECTED: - result = graph - .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<K, VV, EV>() - .setLittleParallelism(lp)); - globalClusteringCoefficient = graph .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<K, VV, EV>() .setLittleParallelism(lp)); @@ -101,13 +98,14 @@ implements Driver<K, VV, EV>, CSV, Hash, Print { averageClusteringCoefficient = graph .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<K, VV, EV>() .setLittleParallelism(lp)); - break; - case UNDIRECTED: - result = graph - .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<K, VV, EV>() + @SuppressWarnings("unchecked") + DataSet<PrintableResult> directedResult = (DataSet<PrintableResult>) (DataSet<?>) graph + .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<K, VV, EV>() .setLittleParallelism(lp)); + return directedResult; + case UNDIRECTED: globalClusteringCoefficient = graph .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<K, VV, EV>() .setLittleParallelism(lp)); @@ -115,7 +113,15 @@ implements Driver<K, VV, EV>, CSV, Hash, Print { averageClusteringCoefficient = graph .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<K, VV, EV>() .setLittleParallelism(lp)); - break; + + @SuppressWarnings("unchecked") + DataSet<PrintableResult> undirectedResult = (DataSet<PrintableResult>) (DataSet<?>) graph + .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<K, VV, EV>() + .setLittleParallelism(lp)); + return undirectedResult; + + default: + throw new RuntimeException("Unknown order: " + order); } } http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java index 32263cf..95904d8 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java @@ -78,12 +78,9 @@ implements Driver<K, VV, EV>, CSV, Hash, Print { @Override public void print(String executionName) throws Exception { - Collect<Vertex<K, K>> collector = new Collect<>(); + List<Vertex<K, K>> results = new Collect<Vertex<K, K>>().run(components).execute(executionName); - // Refactored due to openjdk7 compile error: https://travis-ci.org/greghogan/flink/builds/200487761 - List<Vertex<K, K>> records = collector.run(components).execute(executionName); - - for (Vertex<K, K> result : records) { + for (Vertex<K, K> result : results) { System.out.println(result); } } http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java index 524e70f..5da0284 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java @@ -82,10 +82,7 @@ implements Driver<K, VV, EV>, CSV, Hash, Print { @Override public void print(String executionName) throws Exception { - Collect<Edge<K, EV>> collector = new Collect<>(); - - // Refactored due to openjdk7 compile error: https://travis-ci.org/greghogan/flink/builds/200487761 - List<Edge<K, EV>> records = collector.run(edges).execute(executionName); + List<Edge<K, EV>> records = new Collect<Edge<K, EV>>().run(edges).execute(executionName); if (hasNullValueEdges(edges)) { for (Edge<K, EV> result : records) { http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java index 6081fea..209cddf 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java @@ -20,6 +20,7 @@ package org.apache.flink.graph.drivers; import org.apache.commons.lang3.text.StrBuilder; import org.apache.commons.lang3.text.WordUtils; +import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.drivers.output.CSV; import org.apache.flink.graph.drivers.output.Print; @@ -30,8 +31,8 @@ import org.apache.flink.graph.library.link_analysis.HITS.Result; * Driver for {@link org.apache.flink.graph.library.link_analysis.HITS}. */ public class HITS<K, VV, EV> -extends SimpleDriver<Result<K>> -implements Driver<K, VV, EV>, CSV, Print { +extends SimpleDriver<K, VV, EV, Result<K>> +implements CSV, Print { private static final int DEFAULT_ITERATIONS = 10; @@ -59,8 +60,8 @@ implements Driver<K, VV, EV>, CSV, Print { } @Override - public void plan(Graph<K, VV, EV> graph) throws Exception { - result = graph + protected DataSet<Result<K>> simplePlan(Graph<K, VV, EV> graph) throws Exception { + return graph .run(new org.apache.flink.graph.library.link_analysis.HITS<K, VV, EV>( iterationConvergence.getValue().iterations, iterationConvergence.getValue().convergenceThreshold)); http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java index 1c836ea..ae0d5f8 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java @@ -20,6 +20,7 @@ package org.apache.flink.graph.drivers; import org.apache.commons.lang3.text.StrBuilder; import org.apache.commons.lang3.text.WordUtils; +import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.drivers.output.CSV; import org.apache.flink.graph.drivers.output.Hash; @@ -34,8 +35,24 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * Driver for {@link org.apache.flink.graph.library.similarity.JaccardIndex}. */ public class JaccardIndex<K extends CopyableValue<K>, VV, EV> -extends SimpleDriver<Result<K>> -implements Driver<K, VV, EV>, CSV, Hash, Print { +extends SimpleDriver<K, VV, EV, Result<K>> +implements CSV, Hash, Print { + + private LongParameter minNumerator = new LongParameter(this, "minimum_numerator") + .setDefaultValue(0) + .setMinimumValue(0); + + private LongParameter minDenominator = new LongParameter(this, "minimum_denominator") + .setDefaultValue(1) + .setMinimumValue(1); + + private LongParameter maxNumerator = new LongParameter(this, "maximum_numerator") + .setDefaultValue(1) + .setMinimumValue(0); + + private LongParameter maxDenominator = new LongParameter(this, "maximum_denominator") + .setDefaultValue(1) + .setMinimumValue(1); private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") .setDefaultValue(PARALLELISM_DEFAULT); @@ -64,11 +81,13 @@ implements Driver<K, VV, EV>, CSV, Hash, Print { } @Override - public void plan(Graph<K, VV, EV> graph) throws Exception { + protected DataSet<Result<K>> simplePlan(Graph<K, VV, EV> graph) throws Exception { int lp = littleParallelism.getValue().intValue(); - result = graph + return graph .run(new org.apache.flink.graph.library.similarity.JaccardIndex<K, VV, EV>() + .setMinimumScore(minNumerator.getValue().intValue(), minDenominator.getValue().intValue()) + .setMaximumScore(maxNumerator.getValue().intValue(), maxDenominator.getValue().intValue()) .setLittleParallelism(lp)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java index 8cef077..5d74bdb 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java @@ -19,6 +19,7 @@ package org.apache.flink.graph.drivers; import org.apache.commons.lang3.text.StrBuilder; +import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.drivers.output.CSV; import org.apache.flink.graph.drivers.output.Print; @@ -30,8 +31,8 @@ import org.apache.flink.graph.library.link_analysis.PageRank.Result; * @see org.apache.flink.graph.library.link_analysis.PageRank */ public class PageRank<K, VV, EV> -extends SimpleDriver<Result<K>> -implements Driver<K, VV, EV>, CSV, Print { +extends SimpleDriver<K, VV, EV, Result<K>> +implements CSV, Print { private static final int DEFAULT_ITERATIONS = 10; @@ -64,8 +65,8 @@ implements Driver<K, VV, EV>, CSV, Print { } @Override - public void plan(Graph<K, VV, EV> graph) throws Exception { - result = graph + protected DataSet<Result<K>> simplePlan(Graph<K, VV, EV> graph) throws Exception { + return graph .run(new org.apache.flink.graph.library.link_analysis.PageRank<K, VV, EV>( dampingFactor.getValue(), iterationConvergence.getValue().iterations, http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java index 98bdfc5..5cecca1 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java @@ -19,6 +19,7 @@ package org.apache.flink.graph.drivers; import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.Graph; import org.apache.flink.graph.asm.dataset.ChecksumHashCode; import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; import org.apache.flink.graph.asm.dataset.Collect; @@ -33,30 +34,74 @@ import java.util.List; * * @param <R> algorithm's result type */ -public abstract class SimpleDriver<R extends PrintableResult> -extends ParameterizedBase { +public abstract class SimpleDriver<K, VV, EV, R extends PrintableResult> +extends ParameterizedBase +implements Driver<K, VV, EV> { - protected DataSet<? extends R> result; + private DataSet<R> result; + protected DataSet<R> getResult() { + return result; + } + + /** + * Plan the algorithm and return the result {@link DataSet}. + * + * @param graph input graph + * @return driver output + * @throws Exception on error + */ + protected abstract DataSet<R> simplePlan(Graph<K, VV, EV> graph) throws Exception; + + @Override + public void plan(Graph<K, VV, EV> graph) throws Exception { + result = simplePlan(graph); + } + + /** + * Print hash of execution results. + * + * Does *not* implement/override {@code Hash} since {@link Driver} + * implementations designate the appropriate outputs. + * + * @param executionName job name + * @throws Exception on error + */ public void hash(String executionName) throws Exception { Checksum checksum = new ChecksumHashCode<R>() - .run((DataSet<R>) result) + .run(result) .execute(executionName); System.out.println(checksum); } + /** + * Print execution results. + * + * Does *not* implement/override {@code Print} since {@link Driver} + * implementations designate the appropriate outputs. + * + * @param executionName job name + * @throws Exception on error + */ public void print(String executionName) throws Exception { - Collect<R> collector = new Collect<>(); - - // Refactored due to openjdk7 compile error: https://travis-ci.org/greghogan/flink/builds/200487761 - List<R> records = collector.run((DataSet<R>) result).execute(executionName); + List<R> results = new Collect<R>().run(result).execute(executionName); - for (R result : records) { + for (R result : results) { System.out.println(result.toPrintableString()); } } + /** + * Write execution results to file using CSV format. + * + * Does *not* implement/override {@code CSV} since {@link Driver} + * implementations designate the appropriate outputs. + * + * @param filename output filename + * @param lineDelimiter CSV delimiter between lines + * @param fieldDelimiter CSV delimiter between fields + */ public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) { result .writeAsCsv(filename, lineDelimiter, fieldDelimiter) http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java index ca0c167..5157b8e 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java @@ -20,6 +20,7 @@ package org.apache.flink.graph.drivers; import org.apache.commons.lang3.text.StrBuilder; import org.apache.commons.lang3.text.WordUtils; +import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAnalytic; import org.apache.flink.graph.asm.result.PrintableResult; @@ -42,8 +43,8 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * @see org.apache.flink.graph.library.clustering.undirected.TriadicCensus */ public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV> -extends SimpleDriver<PrintableResult> -implements Driver<K, VV, EV>, CSV, Hash, Print { +extends SimpleDriver<K, VV, EV, PrintableResult> +implements CSV, Hash, Print { private static final String DIRECTED = "directed"; @@ -83,35 +84,40 @@ implements Driver<K, VV, EV>, CSV, Hash, Print { } @Override - public void plan(Graph<K, VV, EV> graph) throws Exception { + protected DataSet<PrintableResult> simplePlan(Graph<K, VV, EV> graph) throws Exception { int lp = littleParallelism.getValue().intValue(); switch (order.getValue()) { case DIRECTED: - result = graph - .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<K, VV, EV>() - .setSortTriangleVertices(sortTriangleVertices.getValue()) - .setLittleParallelism(lp)); - if (computeTriadicCensus.getValue()) { triadicCensus = graph .run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<K, VV, EV>() .setLittleParallelism(lp)); } - break; - case UNDIRECTED: - result = graph - .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<K, VV, EV>() + @SuppressWarnings("unchecked") + DataSet<PrintableResult> directedResult = (DataSet<PrintableResult>) (DataSet<?>) graph + .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<K, VV, EV>() .setSortTriangleVertices(sortTriangleVertices.getValue()) .setLittleParallelism(lp)); + return directedResult; + case UNDIRECTED: if (computeTriadicCensus.getValue()) { triadicCensus = graph .run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<K, VV, EV>() .setLittleParallelism(lp)); } - break; + + @SuppressWarnings("unchecked") + DataSet<PrintableResult> undirectedResult = (DataSet<PrintableResult>) (DataSet<?>) graph + .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<K, VV, EV>() + .setSortTriangleVertices(sortTriangleVertices.getValue()) + .setLittleParallelism(lp)); + return undirectedResult; + + default: + throw new RuntimeException("Unknown order: " + order); } } http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java index e4e6a4c..d64534b 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java @@ -96,6 +96,9 @@ implements Input<K, NullValue, NullValue> { .setMinimumValue(0.0, true) .setMaximumValue(2.0, true); + private LongParameter seed = new LongParameter(this, "seed") + .setDefaultValue(JDKRandomGeneratorFactory.DEFAULT_SEED); + private Simplify simplify = new Simplify(this); private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java index a27ca29..f73d37b 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java @@ -21,11 +21,11 @@ package org.apache.flink.graph.asm.degree.annotate.directed; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.graph.Edge; import org.apache.flink.graph.EdgeOrder; @@ -118,7 +118,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> { public DataSet<Vertex<K, Degrees>> runInternal(Graph<K, VV, EV> input) throws Exception { // s, t, bitmask - DataSet<Tuple3<K, K, ByteValue>> edgesWithOrder = input.getEdges() + DataSet<Tuple2<K, ByteValue>> vertexWithEdgeOrder = input.getEdges() .flatMap(new EmitAndFlipEdge<K, EV>()) .setParallelism(parallelism) .name("Emit and flip edge") @@ -128,9 +128,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> { .name("Reduce bitmask"); // s, d(s) - DataSet<Vertex<K, Degrees>> vertexDegrees = edgesWithOrder + DataSet<Vertex<K, Degrees>> vertexDegrees = vertexWithEdgeOrder .groupBy(0) - .sortGroup(1, Order.ASCENDING) .reduceGroup(new DegreeCount<K>()) .setParallelism(parallelism) .name("Degree count"); @@ -178,22 +177,22 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> { * * @param <T> ID type */ - @ForwardedFields("0; 1") + @ForwardedFields("0") private static final class ReduceBitmask<T> - implements GroupReduceFunction<Tuple3<T, T, ByteValue>, Tuple3<T, T, ByteValue>> { + implements GroupReduceFunction<Tuple3<T, T, ByteValue>, Tuple2<T, ByteValue>> { + private Tuple2<T, ByteValue> output = new Tuple2<>(null, new ByteValue()); + @Override - public void reduce(Iterable<Tuple3<T, T, ByteValue>> values, Collector<Tuple3<T, T, ByteValue>> out) + public void reduce(Iterable<Tuple3<T, T, ByteValue>> values, Collector<Tuple2<T, ByteValue>> out) throws Exception { - Tuple3<T, T, ByteValue> output = null; - byte bitmask = 0; for (Tuple3<T, T, ByteValue> value: values) { - output = value; + output.f0 = value.f0; bitmask |= value.f2.getValue(); } - output.f2.setValue(bitmask); + output.f1.setValue(bitmask); out.collect(output); } } @@ -203,21 +202,22 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> { * * @param <T> ID type */ + @ForwardedFields("0") private static class DegreeCount<T> - implements GroupReduceFunction<Tuple3<T, T, ByteValue>, Vertex<T, Degrees>> { + implements GroupReduceFunction<Tuple2<T, ByteValue>, Vertex<T, Degrees>> { private Vertex<T, Degrees> output = new Vertex<>(null, new Degrees()); @Override - public void reduce(Iterable<Tuple3<T, T, ByteValue>> values, Collector<Vertex<T, Degrees>> out) + public void reduce(Iterable<Tuple2<T, ByteValue>> values, Collector<Vertex<T, Degrees>> out) throws Exception { long degree = 0; long outDegree = 0; long inDegree = 0; - for (Tuple3<T, T, ByteValue> edge : values) { + for (Tuple2<T, ByteValue> edge : values) { output.f0 = edge.f0; - byte bitmask = edge.f2.getValue(); + byte bitmask = edge.f1.getValue(); degree++; http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java index 5bb2f4c..a7d6ef1 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java @@ -25,13 +25,15 @@ import org.apache.flink.types.DoubleValue; class Functions { + private Functions() {} + /** * Sum vertices' scores. * * @param <T> ID type */ @ForwardedFields("0") - static class SumScore<T> + protected static final class SumScore<T> implements ReduceFunction<Tuple2<T, DoubleValue>> { @Override public Tuple2<T, DoubleValue> reduce(Tuple2<T, DoubleValue> left, Tuple2<T, DoubleValue> right) http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java index 747735e..c5c4178 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java @@ -45,6 +45,7 @@ import org.apache.flink.graph.asm.result.UnaryResult; import org.apache.flink.graph.library.link_analysis.Functions.SumScore; import org.apache.flink.graph.library.link_analysis.PageRank.Result; import org.apache.flink.graph.utils.GraphUtils; +import org.apache.flink.graph.utils.GraphUtils.IdentityMapper; import org.apache.flink.graph.utils.Murmur3_32; import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.types.DoubleValue; @@ -175,6 +176,10 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { .run(new VertexDegrees<K, VV, EV>() .setParallelism(parallelism)); + // prevent Exception "The dam has been closed." in TempBarrier + // for a simplified Graph as in PageRankITCase (see FLINK-5623) + vertexDegree = vertexDegree.map(new IdentityMapper<Vertex<K, Degrees>>()); + // vertex count DataSet<LongValue> vertexCount = GraphUtils.count(vertexDegree); http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java index 2f4516a..bc3cb86 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java @@ -78,7 +78,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { private int maximumScoreNumerator = 1; - private int maximumScoreDenominator = 0; + private int maximumScoreDenominator = 1; private int littleParallelism = PARALLELISM_DEFAULT; @@ -121,7 +121,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { } /** - * Filter out Jaccard Index scores greater than or equal to the given maximum fraction. + * Filter out Jaccard Index scores greater than the given maximum fraction. * * @param numerator numerator of the maximum score * @param denominator denominator of the maximum score @@ -253,6 +253,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { * number of groups and {@link GenerateGroups} emits each edge into each group. * * @param <T> ID type + * @param <ET> edge value type */ @ForwardedFields("0->1; 1->2") private static class GenerateGroupSpans<T, ET> @@ -439,7 +440,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { if (unboundedScores || (count * minimumScoreDenominator >= distinctNeighbors * minimumScoreNumerator - && count * maximumScoreDenominator < distinctNeighbors * maximumScoreNumerator)) { + && count * maximumScoreDenominator <= distinctNeighbors * maximumScoreNumerator)) { output.f0 = edge.f0; output.f1 = edge.f1; output.f2.setValue(count); http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java index ee9b770..4ba8e39 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java @@ -30,6 +30,7 @@ import org.apache.flink.types.LongValue; import org.apache.flink.types.NullValue; import org.apache.flink.types.StringValue; import org.apache.flink.types.Value; +import org.apache.flink.util.Preconditions; import java.util.HashMap; import java.util.Map; @@ -54,7 +55,7 @@ public class ValueArrayTypeInfo<T> extends TypeInformation<ValueArray<T>> implem public ValueArrayTypeInfo(TypeInformation<T> valueType) { this.valueType = valueType; - this.type = valueType.getTypeClass(); + this.type = valueType == null ? null : valueType.getTypeClass(); } @Override @@ -85,12 +86,16 @@ public class ValueArrayTypeInfo<T> extends TypeInformation<ValueArray<T>> implem @Override public boolean isKeyType() { + Preconditions.checkNotNull(type, "TypeInformation type class is required"); + return Comparable.class.isAssignableFrom(type); } @Override @SuppressWarnings("unchecked") public TypeSerializer<ValueArray<T>> createSerializer(ExecutionConfig executionConfig) { + Preconditions.checkNotNull(type, "TypeInformation type class is required"); + if (IntValue.class.isAssignableFrom(type)) { return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new IntValueArraySerializer(); } else if (LongValue.class.isAssignableFrom(type)) { @@ -107,6 +112,8 @@ public class ValueArrayTypeInfo<T> extends TypeInformation<ValueArray<T>> implem @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public TypeComparator<ValueArray<T>> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) { + Preconditions.checkNotNull(type, "TypeInformation type class is required"); + if (IntValue.class.isAssignableFrom(type)) { return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new IntValueArrayComparator(sortOrderAscending); } else if (LongValue.class.isAssignableFrom(type)) { @@ -131,6 +138,8 @@ public class ValueArrayTypeInfo<T> extends TypeInformation<ValueArray<T>> implem @Override public int hashCode() { + Preconditions.checkNotNull(type, "TypeInformation type class is required"); + return type.hashCode(); } @@ -154,6 +163,8 @@ public class ValueArrayTypeInfo<T> extends TypeInformation<ValueArray<T>> implem @Override public String toString() { + Preconditions.checkNotNull(type, "TypeInformation type class is required"); + return "ValueArrayType<" + type.getSimpleName() + ">"; } } http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoFactory.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoFactory.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoFactory.java index 2145c3d..1cfb415 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoFactory.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoFactory.java @@ -36,6 +36,9 @@ public class ValueArrayTypeInfoFactory<T> extends TypeInfoFactory<ValueArray<T>> @Override public TypeInformation<ValueArray<T>> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) { - return new ValueArrayTypeInfo(genericParameters.get("T")); + @SuppressWarnings("unchecked") + TypeInformation<ValueArray<T>> typeInfo = new ValueArrayTypeInfo(genericParameters.get("T")); + + return typeInfo; } } http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java index 78fb378..5292751 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java @@ -22,8 +22,10 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.asm.translate.TranslateFunction; import org.apache.flink.types.LongValue; import static org.apache.flink.api.java.typeutils.ValueTypeInfo.LONG_VALUE_TYPE_INFO; @@ -51,6 +53,7 @@ public class GraphUtils { * * @param <T> element type */ + @ForwardedFields("*") public static final class IdentityMapper<T> implements MapFunction<T, T> { public T map(T value) { @@ -59,13 +62,27 @@ public class GraphUtils { } /** + * The identity mapper returns the input as output. + * + * This does not forward fields and is used to break an operator chain. + * + * @param <T> element type + */ + public static final class NonForwardingIdentityMapper<T> + implements MapFunction<T, T> { + public T map(T value) { + return value; + } + } + + /** * Map each element to a value. * * @param <I> input type * @param <O> output type */ public static class MapTo<I, O> - implements MapFunction<I, O>, ResultTypeQueryable<O> { + implements MapFunction<I, O>, ResultTypeQueryable<O>, TranslateFunction<I, O> { private final O value; /** @@ -78,7 +95,13 @@ public class GraphUtils { } @Override - public O map(I o) throws Exception { + public O map(I input) throws Exception { + return value; + } + + @Override + public O translate(I input, O reuse) + throws Exception { return value; } http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java index 128ee70..2443359 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java @@ -81,6 +81,8 @@ extends AsmTestBase { String expectedResult = "(0,1,1,4)\n" + "(0,2,1,4)\n" + + "(0,3,2,4)\n" + + "(1,2,2,4)\n" + "(1,3,1,6)\n" + "(1,4,1,3)\n" + "(1,5,1,3)\n" +
