Repository: flink Updated Branches: refs/heads/master fa1498616 -> baf057a48
[hotfix] [gelly] Driver usage and configuration Fixes driver usages to print error messages. Registers user command-line parameters for web UI configuration. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/baf057a4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/baf057a4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/baf057a4 Branch: refs/heads/master Commit: baf057a4815ebee67f439a55074280fb9ac48aaf Parents: fa14986 Author: Greg Hogan <[email protected]> Authored: Wed Oct 26 12:06:43 2016 -0400 Committer: Greg Hogan <[email protected]> Committed: Wed Oct 26 12:06:43 2016 -0400 ---------------------------------------------------------------------- .../main/java/org/apache/flink/graph/Usage.java | 25 ++++++-- .../graph/drivers/ClusteringCoefficient.java | 15 +++-- .../apache/flink/graph/drivers/Graph500.java | 1 + .../flink/graph/drivers/GraphMetrics.java | 4 ++ .../org/apache/flink/graph/drivers/HITS.java | 4 ++ .../flink/graph/drivers/JaccardIndex.java | 7 ++- .../flink/graph/drivers/TriangleListing.java | 62 ++++++++++++++------ .../annotate/directed/VertexInDegree.java | 2 +- .../annotate/directed/VertexOutDegree.java | 2 +- .../annotate/undirected/VertexDegree.java | 2 +- .../directed/LocalClusteringCoefficient.java | 2 +- .../undirected/LocalClusteringCoefficient.java | 2 +- .../flink/graph/library/link_analysis/HITS.java | 10 ++-- .../library/metric/directed/EdgeMetrics.java | 2 +- .../library/metric/undirected/EdgeMetrics.java | 2 +- 15 files changed, 100 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java index 9d8f116..d923bf0 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java @@ -18,6 +18,9 @@ package org.apache.flink.graph; +import org.apache.commons.lang3.text.StrBuilder; +import org.apache.flink.client.program.ProgramParametrizationException; + /** * This default main class prints usage listing available classes. */ @@ -45,16 +48,26 @@ public class Usage { org.apache.flink.graph.scala.examples.SingleSourceShortestPaths.class, }; - public static void main(String[] args) throws Exception { - System.out.println("Driver classes call algorithms from the Gelly library:"); + private static String getUsage() { + StrBuilder strBuilder = new StrBuilder(); + + strBuilder.appendNewLine(); + strBuilder.appendln("Driver classes call algorithms from the Gelly library:"); for (Class cls : DRIVERS) { - System.out.println(" " + cls.getName()); + strBuilder.append(" ").appendln(cls.getName()); } - System.out.println(""); - System.out.println("Example classes illustrate Gelly APIs or alternative algorithms:"); + strBuilder.appendNewLine(); + strBuilder.appendln("Example classes illustrate Gelly APIs or alternative algorithms:"); for (Class cls : EXAMPLES) { - System.out.println(" " + cls.getName()); + strBuilder.append(" ").appendln(cls.getName()); } + + return strBuilder.toString(); + } + + public static void main(String[] args) throws Exception { + // this exception is throw to prevent Flink from printing an error message + throw new ProgramParametrizationException(getUsage()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java index 18b0406..cd28ee4 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java @@ -87,6 +87,8 @@ public class ClusteringCoefficient { .appendln(" --output print") .appendln(" --output hash") .appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]") + .appendNewLine() + .appendln("Usage error: " + message) .toString(); } @@ -96,6 +98,7 @@ public class ClusteringCoefficient { env.getConfig().enableObjectReuse(); ParameterTool parameters = ParameterTool.fromArgs(args); + env.getConfig().setGlobalJobParameters(parameters); if (! parameters.has("directed")) { throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'")); @@ -131,7 +134,8 @@ public class ClusteringCoefficient { if (directedAlgorithm) { if (parameters.getBoolean("simplify", false)) { graph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>() + .setParallelism(little_parallelism)); } gcc = graph @@ -146,7 +150,8 @@ public class ClusteringCoefficient { } else { if (parameters.getBoolean("simplify", false)) { graph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false)); + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false) + .setParallelism(little_parallelism)); } gcc = graph @@ -168,7 +173,8 @@ public class ClusteringCoefficient { if (directedAlgorithm) { if (parameters.getBoolean("simplify", false)) { graph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>() + .setParallelism(little_parallelism)); } gcc = graph @@ -183,7 +189,8 @@ public class ClusteringCoefficient { } else { if (parameters.getBoolean("simplify", false)) { graph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false)); + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false) + .setParallelism(little_parallelism)); } gcc = graph http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java index 8f9a54a..51ef66f 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java @@ -82,6 +82,7 @@ public class Graph500 { env.getConfig().enableObjectReuse(); ParameterTool parameters = ParameterTool.fromArgs(args); + env.getConfig().setGlobalJobParameters(parameters); // Generate RMat graph int scale = parameters.getInt("scale", DEFAULT_SCALE); http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java index 4fb11c3..899ae66 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java @@ -68,6 +68,8 @@ public class GraphMetrics { .appendln("options:") .appendln(" --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]") .appendln(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]") + .appendNewLine() + .appendln("Usage error: " + message) .toString(); } @@ -77,6 +79,8 @@ public class GraphMetrics { env.getConfig().enableObjectReuse(); ParameterTool parameters = ParameterTool.fromArgs(args); + env.getConfig().setGlobalJobParameters(parameters); + if (! parameters.has("directed")) { throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'")); } http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java index e0a233a..b035bd7 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java @@ -78,6 +78,8 @@ public class HITS { .appendln(" --output print") .appendln(" --output hash") .appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]") + .appendNewLine() + .appendln("Usage error: " + message) .toString(); } @@ -87,6 +89,8 @@ public class HITS { env.getConfig().enableObjectReuse(); ParameterTool parameters = ParameterTool.fromArgs(args); + env.getConfig().setGlobalJobParameters(parameters); + int iterations = parameters.getInt("iterations", DEFAULT_ITERATIONS); DataSet hits; http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java index 5c173e0..cb11af9 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java @@ -95,6 +95,7 @@ public class JaccardIndex { env.getConfig().enableObjectReuse(); ParameterTool parameters = ParameterTool.fromArgs(args); + env.getConfig().setGlobalJobParameters(parameters); int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT); @@ -121,7 +122,8 @@ public class JaccardIndex { if (parameters.getBoolean("simplify", false)) { graph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false)); + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false) + .setParallelism(little_parallelism)); } ji = graph @@ -135,7 +137,8 @@ public class JaccardIndex { if (parameters.getBoolean("simplify", false)) { graph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>() + .setParallelism(little_parallelism)); } ji = graph http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java index 954f732..92f6a2c 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java @@ -44,6 +44,8 @@ import org.apache.flink.types.StringValue; import java.text.NumberFormat; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + /** * Driver for the library implementation of Triangle Listing. * @@ -79,6 +81,8 @@ public class TriangleListing { .appendln(" --output print") .appendln(" --output hash") .appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]") + .appendNewLine() + .appendln("Usage error: " + message) .toString(); } @@ -88,11 +92,15 @@ public class TriangleListing { env.getConfig().enableObjectReuse(); ParameterTool parameters = ParameterTool.fromArgs(args); + env.getConfig().setGlobalJobParameters(parameters); + if (! parameters.has("directed")) { throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'")); } boolean directedAlgorithm = parameters.getBoolean("directed"); + int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT); + DataSet tl; switch (parameters.get("input", "")) { @@ -117,19 +125,23 @@ public class TriangleListing { if (directedAlgorithm) { if (parameters.getBoolean("simplify", false)) { graph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>() + .setParallelism(little_parallelism)); } tl = graph - .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); } else { if (parameters.getBoolean("simplify", false)) { graph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false)); + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false) + .setParallelism(little_parallelism)); } tl = graph - .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); } } break; @@ -140,19 +152,23 @@ public class TriangleListing { if (directedAlgorithm) { if (parameters.getBoolean("simplify", false)) { graph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>() + .setParallelism(little_parallelism)); } tl = graph - .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<StringValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<StringValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); } else { if (parameters.getBoolean("simplify", false)) { graph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false)); + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false) + .setParallelism(little_parallelism)); } tl = graph - .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<StringValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<StringValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); } } break; @@ -178,13 +194,18 @@ public class TriangleListing { if (directedAlgorithm) { if (scale > 32) { tl = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()) - .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>() + .setParallelism(little_parallelism)) + .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); } else { tl = graph - .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())) - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>()) - .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<IntValue, NullValue, NullValue>()); + .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()) + .setParallelism(little_parallelism)) + .run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>() + .setParallelism(little_parallelism)) + .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<IntValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); } } else { boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); @@ -194,13 +215,18 @@ public class TriangleListing { if (scale > 32) { tl = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip)) - .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip) + .setParallelism(little_parallelism)) + .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); } else { tl = graph - .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())) - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip)) - .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, NullValue, NullValue>()); + .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()) + .setParallelism(little_parallelism)) + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip) + .setParallelism(little_parallelism)) + .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); } } } break; http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java index 934c4ed..5fdd8f9 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java @@ -121,7 +121,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> { DataSet<Vertex<K, LongValue>> targetDegree = targetIds .groupBy(0) .reduce(new DegreeCount<K>()) - .setCombineHint(CombineHint.HASH) + .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Degree count"); http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java index a8745ca..8e3e9c6 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java @@ -121,7 +121,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> { DataSet<Vertex<K, LongValue>> sourceDegree = sourceIds .groupBy(0) .reduce(new DegreeCount<K>()) - .setCombineHint(CombineHint.HASH) + .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Degree count"); http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java index f466f85..b731548 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java @@ -145,7 +145,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> { DataSet<Vertex<K, LongValue>> degree = vertexIds .groupBy(0) .reduce(new DegreeCount<K>()) - .setCombineHint(CombineHint.HASH) + .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Degree count"); http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java index 608500b..93fb678 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java @@ -151,7 +151,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { DataSet<Tuple2<K, LongValue>> vertexTriangleCount = triangleVertices .groupBy(0) .reduce(new CountTriangles<K>()) - .setCombineHint(CombineHint.HASH) + .setCombineHint(CombineHint.HASH) .name("Count triangles"); // u, deg(u) http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java index 3621156..b22a0ce 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java @@ -151,7 +151,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { DataSet<Tuple2<K, LongValue>> vertexTriangleCount = triangleVertices .groupBy(0) .reduce(new CountTriangles<K>()) - .setCombineHint(CombineHint.HASH) + .setCombineHint(CombineHint.HASH) .name("Count triangles"); // u, deg(u) http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java index 9e3511c..1be55f0 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java @@ -171,7 +171,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { .name("Initial scores") .groupBy(0) .reduce(new SumScores<K>()) - .setCombineHint(CombineHint.HASH) + .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Sum"); @@ -188,7 +188,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { .name("Hub") .groupBy(0) .reduce(new SumScore<K>()) - .setCombineHint(CombineHint.HASH) + .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Sum"); @@ -198,7 +198,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { .setParallelism(parallelism) .name("Square") .reduce(new Sum()) - .setCombineHint(CombineHint.HASH) + .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Sum"); @@ -212,7 +212,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { .name("Authority") .groupBy(0) .reduce(new SumScore<K>()) - .setCombineHint(CombineHint.HASH) + .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Sum"); @@ -222,7 +222,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { .setParallelism(parallelism) .name("Square") .reduce(new Sum()) - .setCombineHint(CombineHint.HASH) + .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Sum"); http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java index b3e1e30..648fb76 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java @@ -117,7 +117,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { .name("Reduce edge stats") .groupBy(0) .reduce(new SumEdgeStats<K>()) - .setCombineHint(CombineHint.HASH) + .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Sum edge stats"); http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java index 6bce42c..1c636ff 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java @@ -123,7 +123,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { .name("Edge stats") .groupBy(0) .reduce(new SumEdgeStats<K>()) - .setCombineHint(CombineHint.HASH) + .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Sum edge stats");
