Repository: flink
Updated Branches:
  refs/heads/master fa1498616 -> baf057a48


[hotfix] [gelly] Driver usage and configuration

Fixes driver usages to print error messages.

Registers user command-line parameters for web UI configuration.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/baf057a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/baf057a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/baf057a4

Branch: refs/heads/master
Commit: baf057a4815ebee67f439a55074280fb9ac48aaf
Parents: fa14986
Author: Greg Hogan <[email protected]>
Authored: Wed Oct 26 12:06:43 2016 -0400
Committer: Greg Hogan <[email protected]>
Committed: Wed Oct 26 12:06:43 2016 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Usage.java | 25 ++++++--
 .../graph/drivers/ClusteringCoefficient.java    | 15 +++--
 .../apache/flink/graph/drivers/Graph500.java    |  1 +
 .../flink/graph/drivers/GraphMetrics.java       |  4 ++
 .../org/apache/flink/graph/drivers/HITS.java    |  4 ++
 .../flink/graph/drivers/JaccardIndex.java       |  7 ++-
 .../flink/graph/drivers/TriangleListing.java    | 62 ++++++++++++++------
 .../annotate/directed/VertexInDegree.java       |  2 +-
 .../annotate/directed/VertexOutDegree.java      |  2 +-
 .../annotate/undirected/VertexDegree.java       |  2 +-
 .../directed/LocalClusteringCoefficient.java    |  2 +-
 .../undirected/LocalClusteringCoefficient.java  |  2 +-
 .../flink/graph/library/link_analysis/HITS.java | 10 ++--
 .../library/metric/directed/EdgeMetrics.java    |  2 +-
 .../library/metric/undirected/EdgeMetrics.java  |  2 +-
 15 files changed, 100 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
index 9d8f116..d923bf0 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.graph;
 
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.flink.client.program.ProgramParametrizationException;
+
 /**
  * This default main class prints usage listing available classes.
  */
@@ -45,16 +48,26 @@ public class Usage {
                
org.apache.flink.graph.scala.examples.SingleSourceShortestPaths.class,
        };
 
-       public static void main(String[] args) throws Exception {
-               System.out.println("Driver classes call algorithms from the 
Gelly library:");
+       private static String getUsage() {
+               StrBuilder strBuilder = new StrBuilder();
+
+               strBuilder.appendNewLine();
+               strBuilder.appendln("Driver classes call algorithms from the 
Gelly library:");
                for (Class cls : DRIVERS) {
-                       System.out.println("  " + cls.getName());
+                       strBuilder.append("  ").appendln(cls.getName());
                }
 
-               System.out.println("");
-               System.out.println("Example classes illustrate Gelly APIs or 
alternative algorithms:");
+               strBuilder.appendNewLine();
+               strBuilder.appendln("Example classes illustrate Gelly APIs or 
alternative algorithms:");
                for (Class cls : EXAMPLES) {
-                       System.out.println("  " + cls.getName());
+                       strBuilder.append("  ").appendln(cls.getName());
                }
+
+               return strBuilder.toString();
+       }
+
+       public static void main(String[] args) throws Exception {
+               // this exception is throw to prevent Flink from printing an 
error message
+               throw new ProgramParametrizationException(getUsage());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
index 18b0406..cd28ee4 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
@@ -87,6 +87,8 @@ public class ClusteringCoefficient {
                        .appendln("  --output print")
                        .appendln("  --output hash")
                        .appendln("  --output csv --output_filename FILENAME 
[--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter 
FIELD_DELIMITER]")
+                       .appendNewLine()
+                       .appendln("Usage error: " + message)
                        .toString();
        }
 
@@ -96,6 +98,7 @@ public class ClusteringCoefficient {
                env.getConfig().enableObjectReuse();
 
                ParameterTool parameters = ParameterTool.fromArgs(args);
+               env.getConfig().setGlobalJobParameters(parameters);
 
                if (! parameters.has("directed")) {
                        throw new 
ProgramParametrizationException(getUsage("must declare execution mode as 
'--directed true' or '--directed false'"));
@@ -131,7 +134,8 @@ public class ClusteringCoefficient {
                                                if (directedAlgorithm) {
                                                        if 
(parameters.getBoolean("simplify", false)) {
                                                                graph = graph
-                                                                       
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, 
NullValue, NullValue>());
+                                                                       
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, 
NullValue, NullValue>()
+                                                                               
.setParallelism(little_parallelism));
                                                        }
 
                                                        gcc = graph
@@ -146,7 +150,8 @@ public class ClusteringCoefficient {
                                                } else {
                                                        if 
(parameters.getBoolean("simplify", false)) {
                                                                graph = graph
-                                                                       
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, 
NullValue, NullValue>(false));
+                                                                       
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, 
NullValue, NullValue>(false)
+                                                                               
.setParallelism(little_parallelism));
                                                        }
 
                                                        gcc = graph
@@ -168,7 +173,8 @@ public class ClusteringCoefficient {
                                                if (directedAlgorithm) {
                                                        if 
(parameters.getBoolean("simplify", false)) {
                                                                graph = graph
-                                                                       
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, 
NullValue, NullValue>());
+                                                                       
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, 
NullValue, NullValue>()
+                                                                               
.setParallelism(little_parallelism));
                                                        }
 
                                                        gcc = graph
@@ -183,7 +189,8 @@ public class ClusteringCoefficient {
                                                } else {
                                                        if 
(parameters.getBoolean("simplify", false)) {
                                                                graph = graph
-                                                                       
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, 
NullValue, NullValue>(false));
+                                                                       
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, 
NullValue, NullValue>(false)
+                                                                               
.setParallelism(little_parallelism));
                                                        }
 
                                                        gcc = graph

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
index 8f9a54a..51ef66f 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
@@ -82,6 +82,7 @@ public class Graph500 {
                env.getConfig().enableObjectReuse();
 
                ParameterTool parameters = ParameterTool.fromArgs(args);
+               env.getConfig().setGlobalJobParameters(parameters);
 
                // Generate RMat graph
                int scale = parameters.getInt("scale", DEFAULT_SCALE);

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
index 4fb11c3..899ae66 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
@@ -68,6 +68,8 @@ public class GraphMetrics {
                        .appendln("options:")
                        .appendln("  --input csv --type <integer | string> 
[--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter 
LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
                        .appendln("  --input rmat [--scale SCALE] 
[--edge_factor EDGE_FACTOR]")
+                       .appendNewLine()
+                       .appendln("Usage error: " + message)
                        .toString();
        }
 
@@ -77,6 +79,8 @@ public class GraphMetrics {
                env.getConfig().enableObjectReuse();
 
                ParameterTool parameters = ParameterTool.fromArgs(args);
+               env.getConfig().setGlobalJobParameters(parameters);
+
                if (! parameters.has("directed")) {
                        throw new 
ProgramParametrizationException(getUsage("must declare execution mode as 
'--directed true' or '--directed false'"));
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
index e0a233a..b035bd7 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
@@ -78,6 +78,8 @@ public class HITS {
                        .appendln("  --output print")
                        .appendln("  --output hash")
                        .appendln("  --output csv --output_filename FILENAME 
[--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter 
FIELD_DELIMITER]")
+                       .appendNewLine()
+                       .appendln("Usage error: " + message)
                        .toString();
        }
 
@@ -87,6 +89,8 @@ public class HITS {
                env.getConfig().enableObjectReuse();
 
                ParameterTool parameters = ParameterTool.fromArgs(args);
+               env.getConfig().setGlobalJobParameters(parameters);
+
                int iterations = parameters.getInt("iterations", 
DEFAULT_ITERATIONS);
 
                DataSet hits;

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
index 5c173e0..cb11af9 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
@@ -95,6 +95,7 @@ public class JaccardIndex {
                env.getConfig().enableObjectReuse();
 
                ParameterTool parameters = ParameterTool.fromArgs(args);
+               env.getConfig().setGlobalJobParameters(parameters);
 
                int little_parallelism = 
parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
 
@@ -121,7 +122,8 @@ public class JaccardIndex {
 
                                                if 
(parameters.getBoolean("simplify", false)) {
                                                        graph = graph
-                                                               .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, 
NullValue>(false));
+                                                               .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, 
NullValue>(false)
+                                                                       
.setParallelism(little_parallelism));
                                                }
 
                                                ji = graph
@@ -135,7 +137,8 @@ public class JaccardIndex {
 
                                                if 
(parameters.getBoolean("simplify", false)) {
                                                        graph = graph
-                                                               .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, 
NullValue>());
+                                                               .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, 
NullValue>()
+                                                                       
.setParallelism(little_parallelism));
                                                }
 
                                                ji = graph

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
index 954f732..92f6a2c 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
@@ -44,6 +44,8 @@ import org.apache.flink.types.StringValue;
 
 import java.text.NumberFormat;
 
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
 /**
  * Driver for the library implementation of Triangle Listing.
  *
@@ -79,6 +81,8 @@ public class TriangleListing {
                        .appendln("  --output print")
                        .appendln("  --output hash")
                        .appendln("  --output csv --output_filename FILENAME 
[--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter 
FIELD_DELIMITER]")
+                       .appendNewLine()
+                       .appendln("Usage error: " + message)
                        .toString();
        }
 
@@ -88,11 +92,15 @@ public class TriangleListing {
                env.getConfig().enableObjectReuse();
 
                ParameterTool parameters = ParameterTool.fromArgs(args);
+               env.getConfig().setGlobalJobParameters(parameters);
+
                if (! parameters.has("directed")) {
                        throw new 
ProgramParametrizationException(getUsage("must declare execution mode as 
'--directed true' or '--directed false'"));
                }
                boolean directedAlgorithm = parameters.getBoolean("directed");
 
+               int little_parallelism = 
parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
+
                DataSet tl;
 
                switch (parameters.get("input", "")) {
@@ -117,19 +125,23 @@ public class TriangleListing {
                                                if (directedAlgorithm) {
                                                        if 
(parameters.getBoolean("simplify", false)) {
                                                                graph = graph
-                                                                       
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, 
NullValue, NullValue>());
+                                                                       
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, 
NullValue, NullValue>()
+                                                                               
.setParallelism(little_parallelism));
                                                        }
 
                                                        tl = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, 
NullValue, NullValue>());
+                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, 
NullValue, NullValue>()
+                                                                       
.setLittleParallelism(little_parallelism));
                                                } else {
                                                        if 
(parameters.getBoolean("simplify", false)) {
                                                                graph = graph
-                                                                       
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, 
NullValue, NullValue>(false));
+                                                                       
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, 
NullValue, NullValue>(false)
+                                                                               
.setParallelism(little_parallelism));
                                                        }
 
                                                        tl = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, 
NullValue, NullValue>());
+                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, 
NullValue, NullValue>()
+                                                                       
.setLittleParallelism(little_parallelism));
                                                }
                                        } break;
 
@@ -140,19 +152,23 @@ public class TriangleListing {
                                                if (directedAlgorithm) {
                                                        if 
(parameters.getBoolean("simplify", false)) {
                                                                graph = graph
-                                                                       
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, 
NullValue, NullValue>());
+                                                                       
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, 
NullValue, NullValue>()
+                                                                               
.setParallelism(little_parallelism));
                                                        }
 
                                                        tl = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<StringValue, 
NullValue, NullValue>());
+                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<StringValue, 
NullValue, NullValue>()
+                                                                       
.setLittleParallelism(little_parallelism));
                                                } else {
                                                        if 
(parameters.getBoolean("simplify", false)) {
                                                                graph = graph
-                                                                       
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, 
NullValue, NullValue>(false));
+                                                                       
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, 
NullValue, NullValue>(false)
+                                                                               
.setParallelism(little_parallelism));
                                                        }
 
                                                        tl = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<StringValue,
 NullValue, NullValue>());
+                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<StringValue,
 NullValue, NullValue>()
+                                                                       
.setLittleParallelism(little_parallelism));
                                                }
                                        } break;
 
@@ -178,13 +194,18 @@ public class TriangleListing {
                                if (directedAlgorithm) {
                                        if (scale > 32) {
                                                tl = graph
-                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, 
NullValue>())
-                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, 
NullValue, NullValue>());
+                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, 
NullValue>()
+                                                               
.setParallelism(little_parallelism))
+                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, 
NullValue, NullValue>()
+                                                               
.setLittleParallelism(little_parallelism));
                                        } else {
                                                tl = graph
-                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue()))
-                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, 
NullValue>())
-                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<IntValue, 
NullValue, NullValue>());
+                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue())
+                                                               
.setParallelism(little_parallelism))
+                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, 
NullValue>()
+                                                               
.setParallelism(little_parallelism))
+                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<IntValue, 
NullValue, NullValue>()
+                                                               
.setLittleParallelism(little_parallelism));
                                        }
                                } else {
                                        boolean clipAndFlip = 
parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
@@ -194,13 +215,18 @@ public class TriangleListing {
 
                                        if (scale > 32) {
                                                tl = graph
-                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, 
NullValue>(clipAndFlip))
-                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, 
NullValue, NullValue>());
+                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, 
NullValue>(clipAndFlip)
+                                                               
.setParallelism(little_parallelism))
+                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, 
NullValue, NullValue>()
+                                                               
.setLittleParallelism(little_parallelism));
                                        } else {
                                                tl = graph
-                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue()))
-                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, 
NullValue>(clipAndFlip))
-                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, 
NullValue, NullValue>());
+                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue())
+                                                               
.setParallelism(little_parallelism))
+                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, 
NullValue>(clipAndFlip)
+                                                               
.setParallelism(little_parallelism))
+                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, 
NullValue, NullValue>()
+                                                               
.setLittleParallelism(little_parallelism));
                                        }
                                }
                        } break;

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
index 934c4ed..5fdd8f9 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
@@ -121,7 +121,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
LongValue>> {
                DataSet<Vertex<K, LongValue>> targetDegree = targetIds
                        .groupBy(0)
                        .reduce(new DegreeCount<K>())
-                               .setCombineHint(CombineHint.HASH)
+                       .setCombineHint(CombineHint.HASH)
                                .setParallelism(parallelism)
                                .name("Degree count");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
index a8745ca..8e3e9c6 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
@@ -121,7 +121,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
LongValue>> {
                DataSet<Vertex<K, LongValue>> sourceDegree = sourceIds
                        .groupBy(0)
                        .reduce(new DegreeCount<K>())
-                               .setCombineHint(CombineHint.HASH)
+                       .setCombineHint(CombineHint.HASH)
                                .setParallelism(parallelism)
                                .name("Degree count");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
index f466f85..b731548 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
@@ -145,7 +145,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
LongValue>> {
                DataSet<Vertex<K, LongValue>> degree = vertexIds
                        .groupBy(0)
                        .reduce(new DegreeCount<K>())
-                               .setCombineHint(CombineHint.HASH)
+                       .setCombineHint(CombineHint.HASH)
                                .setParallelism(parallelism)
                                .name("Degree count");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
index 608500b..93fb678 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
@@ -151,7 +151,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                DataSet<Tuple2<K, LongValue>> vertexTriangleCount = 
triangleVertices
                        .groupBy(0)
                        .reduce(new CountTriangles<K>())
-                               .setCombineHint(CombineHint.HASH)
+                       .setCombineHint(CombineHint.HASH)
                                .name("Count triangles");
 
                // u, deg(u)

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index 3621156..b22a0ce 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -151,7 +151,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                DataSet<Tuple2<K, LongValue>> vertexTriangleCount = 
triangleVertices
                        .groupBy(0)
                        .reduce(new CountTriangles<K>())
-                               .setCombineHint(CombineHint.HASH)
+                       .setCombineHint(CombineHint.HASH)
                                .name("Count triangles");
 
                // u, deg(u)

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
index 9e3511c..1be55f0 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
@@ -171,7 +171,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                                .name("Initial scores")
                        .groupBy(0)
                        .reduce(new SumScores<K>())
-                               .setCombineHint(CombineHint.HASH)
+                       .setCombineHint(CombineHint.HASH)
                                .setParallelism(parallelism)
                                .name("Sum");
 
@@ -188,7 +188,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                                .name("Hub")
                        .groupBy(0)
                        .reduce(new SumScore<K>())
-                               .setCombineHint(CombineHint.HASH)
+                       .setCombineHint(CombineHint.HASH)
                                .setParallelism(parallelism)
                                .name("Sum");
 
@@ -198,7 +198,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                                .setParallelism(parallelism)
                                .name("Square")
                        .reduce(new Sum())
-                               .setCombineHint(CombineHint.HASH)
+                       .setCombineHint(CombineHint.HASH)
                                .setParallelism(parallelism)
                                .name("Sum");
 
@@ -212,7 +212,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                                .name("Authority")
                        .groupBy(0)
                        .reduce(new SumScore<K>())
-                               .setCombineHint(CombineHint.HASH)
+                       .setCombineHint(CombineHint.HASH)
                                .setParallelism(parallelism)
                                .name("Sum");
 
@@ -222,7 +222,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                                .setParallelism(parallelism)
                                .name("Square")
                        .reduce(new Sum())
-                               .setCombineHint(CombineHint.HASH)
+                       .setCombineHint(CombineHint.HASH)
                                .setParallelism(parallelism)
                                .name("Sum");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
index b3e1e30..648fb76 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
@@ -117,7 +117,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                                .name("Reduce edge stats")
                        .groupBy(0)
                        .reduce(new SumEdgeStats<K>())
-                               .setCombineHint(CombineHint.HASH)
+                       .setCombineHint(CombineHint.HASH)
                                .setParallelism(parallelism)
                                .name("Sum edge stats");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
index 6bce42c..1c636ff 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
@@ -123,7 +123,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                                .name("Edge stats")
                        .groupBy(0)
                        .reduce(new SumEdgeStats<K>())
-                               .setCombineHint(CombineHint.HASH)
+                       .setCombineHint(CombineHint.HASH)
                                .setParallelism(parallelism)
                                .name("Sum edge stats");
 

Reply via email to