[FLINK-4571] [gelly] Configurable little parallelism in Gelly drivers

This closes #2475


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bdd3c0d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bdd3c0d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bdd3c0d9

Branch: refs/heads/master
Commit: bdd3c0d94b2a6cdecb482ee3fdefe082fc1b7c4d
Parents: 8210ff4
Author: Greg Hogan <[email protected]>
Authored: Fri Sep 2 11:53:08 2016 -0400
Committer: Greg Hogan <[email protected]>
Committed: Thu Sep 8 17:06:29 2016 -0400

----------------------------------------------------------------------
 .../graph/examples/ClusteringCoefficient.java   | 72 ++++++++++++++------
 .../flink/graph/examples/JaccardIndex.java      | 30 +++++---
 .../annotate/directed/EdgeDegreesPair.java      |  3 +-
 .../annotate/directed/EdgeSourceDegrees.java    |  3 +-
 .../annotate/directed/EdgeTargetDegrees.java    |  3 +-
 .../degree/annotate/directed/VertexDegrees.java |  5 +-
 .../annotate/directed/VertexInDegree.java       |  7 +-
 .../annotate/directed/VertexOutDegree.java      |  7 +-
 .../annotate/undirected/EdgeDegreePair.java     |  3 +-
 .../annotate/undirected/EdgeSourceDegree.java   |  3 +-
 .../annotate/undirected/EdgeTargetDegree.java   |  3 +-
 .../annotate/undirected/VertexDegree.java       |  7 +-
 .../degree/filter/undirected/MaximumDegree.java |  3 +-
 .../graph/asm/simple/directed/Simplify.java     |  3 +-
 .../graph/asm/simple/undirected/Simplify.java   |  3 +-
 .../asm/translate/TranslateEdgeValues.java      |  3 +-
 .../graph/asm/translate/TranslateGraphIds.java  |  3 +-
 .../asm/translate/TranslateVertexValues.java    |  3 +-
 .../directed/LocalClusteringCoefficient.java    |  3 +-
 .../clustering/directed/TriangleListing.java    |  5 +-
 .../undirected/LocalClusteringCoefficient.java  |  3 +-
 .../clustering/undirected/TriangleListing.java  |  5 +-
 .../flink/graph/library/link_analysis/HITS.java |  3 +-
 .../graph/library/similarity/AdamicAdar.java    |  3 +-
 .../graph/library/similarity/JaccardIndex.java  |  3 +-
 25 files changed, 124 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
index e099e2b..f4b1ecf 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
@@ -42,6 +42,8 @@ import org.apache.flink.types.StringValue;
 
 import java.text.NumberFormat;
 
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
 /**
  * Driver for the library implementations of Global and Local Clustering 
Coefficient.
  *
@@ -89,12 +91,15 @@ public class ClusteringCoefficient {
                env.getConfig().enableObjectReuse();
 
                ParameterTool parameters = ParameterTool.fromArgs(args);
+
                if (! parameters.has("directed")) {
                        printUsage();
                        return;
                }
                boolean directedAlgorithm = parameters.getBoolean("directed");
 
+               int little_parallelism = 
parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
+
                // global and local clustering coefficient results
                GraphAnalytic gcc;
                DataSet lcc;
@@ -120,14 +125,18 @@ public class ClusteringCoefficient {
 
                                                if (directedAlgorithm) {
                                                        gcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue,
 NullValue, NullValue>());
+                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
+                                                                       
.setLittleParallelism(little_parallelism));
                                                        lcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>());
+                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
+                                                                       
.setLittleParallelism(little_parallelism));
                                                } else {
                                                        gcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue,
 NullValue, NullValue>());
+                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
+                                                                       
.setLittleParallelism(little_parallelism));
                                                        lcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>());
+                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
+                                                                       
.setLittleParallelism(little_parallelism));
                                                }
                                        } break;
 
@@ -137,14 +146,18 @@ public class ClusteringCoefficient {
 
                                                if (directedAlgorithm) {
                                                        gcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<StringValue,
 NullValue, NullValue>());
+                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<StringValue,
 NullValue, NullValue>()
+                                                                       
.setLittleParallelism(little_parallelism));
                                                        lcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<StringValue,
 NullValue, NullValue>());
+                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<StringValue,
 NullValue, NullValue>()
+                                                                       
.setLittleParallelism(little_parallelism));
                                                } else {
                                                        gcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<StringValue,
 NullValue, NullValue>());
+                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<StringValue,
 NullValue, NullValue>()
+                                                                       
.setLittleParallelism(little_parallelism));
                                                        lcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<StringValue,
 NullValue, NullValue>());
+                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<StringValue,
 NullValue, NullValue>()
+                                                                       
.setLittleParallelism(little_parallelism));
                                                }
                                        } break;
 
@@ -164,51 +177,66 @@ public class ClusteringCoefficient {
                                long edgeCount = vertexCount * edgeFactor;
 
                                Graph<LongValue, NullValue, NullValue> graph = 
new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+                                       .setParallelism(little_parallelism)
                                        .generate();
 
                                if (directedAlgorithm) {
                                        if (scale > 32) {
                                                Graph<LongValue, NullValue, 
NullValue> newGraph = graph
-                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, 
NullValue>());
+                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, 
NullValue>()
+                                                               
.setParallelism(little_parallelism));
 
                                                gcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue,
 NullValue, NullValue>());
+                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
+                                                               
.setLittleParallelism(little_parallelism));
                                                lcc = newGraph
                                                        .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                               
.setIncludeZeroDegreeVertices(false));
+                                                               
.setIncludeZeroDegreeVertices(false)
+                                                               
.setLittleParallelism(little_parallelism));
                                        } else {
                                                Graph<IntValue, NullValue, 
NullValue> newGraph = graph
-                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToIntValue()))
-                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, 
NullValue>());
+                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToIntValue())
+                                                               
.setParallelism(little_parallelism))
+                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, 
NullValue>()
+                                                               
.setParallelism(little_parallelism));
 
                                                gcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue,
 NullValue, NullValue>());
+                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue,
 NullValue, NullValue>()
+                                                               
.setLittleParallelism(little_parallelism));
                                                lcc = newGraph
                                                        .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue,
 NullValue, NullValue>()
-                                                               
.setIncludeZeroDegreeVertices(false));
+                                                               
.setIncludeZeroDegreeVertices(false)
+                                                               
.setLittleParallelism(little_parallelism));
                                        }
                                } else {
                                        boolean clipAndFlip = 
parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
 
                                        if (scale > 32) {
                                                Graph<LongValue, NullValue, 
NullValue> newGraph = graph
-                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, 
NullValue>(clipAndFlip));
+                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, 
NullValue>(clipAndFlip)
+                                                               
.setParallelism(little_parallelism));
 
                                                gcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue,
 NullValue, NullValue>());
+                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
+                                                               
.setLittleParallelism(little_parallelism));
                                                lcc = newGraph
                                                        .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                               
.setIncludeZeroDegreeVertices(false));
+                                                               
.setIncludeZeroDegreeVertices(false)
+                                                               
.setLittleParallelism(little_parallelism));
                                        } else {
                                                Graph<IntValue, NullValue, 
NullValue> newGraph = graph
-                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToIntValue()))
-                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, 
NullValue>(clipAndFlip));
+                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToIntValue())
+                                                               
.setParallelism(little_parallelism))
+                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, 
NullValue>(clipAndFlip)
+                                                               
.setParallelism(little_parallelism));
 
                                                gcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue,
 NullValue, NullValue>());
+                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue,
 NullValue, NullValue>()
+                                                               
.setLittleParallelism(little_parallelism));
                                                lcc = newGraph
                                                        .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue,
 NullValue, NullValue>()
-                                                               
.setIncludeZeroDegreeVertices(false));
+                                                               
.setIncludeZeroDegreeVertices(false)
+                                                               
.setLittleParallelism(little_parallelism));
                                        }
                                }
                        } break;

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
index 824aab7..96f66ab 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
@@ -43,6 +43,8 @@ import org.apache.flink.types.StringValue;
 
 import java.text.NumberFormat;
 
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
 /**
  * Driver for the library implementation of Jaccard Index.
  *
@@ -87,6 +89,8 @@ public class JaccardIndex {
 
                ParameterTool parameters = ParameterTool.fromArgs(args);
 
+               int little_parallelism = 
parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
+
                DataSet ji;
 
                switch (parameters.get("input", "")) {
@@ -107,13 +111,15 @@ public class JaccardIndex {
                                        case "integer": {
                                                ji = reader
                                                        
.keyType(LongValue.class)
-                                                       .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, 
NullValue>());
+                                                       .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, 
NullValue>()
+                                                               
.setLittleParallelism(little_parallelism));
                                        } break;
 
                                        case "string": {
                                                ji = reader
                                                        
.keyType(StringValue.class)
-                                                       .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex<StringValue, NullValue, 
NullValue>());
+                                                       .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex<StringValue, NullValue, 
NullValue>()
+                                                               
.setLittleParallelism(little_parallelism));
                                        } break;
 
                                        default:
@@ -131,20 +137,26 @@ public class JaccardIndex {
                                long vertexCount = 1L << scale;
                                long edgeCount = vertexCount * edgeFactor;
 
-                               boolean clipAndFlip = 
parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
-
                                Graph<LongValue, NullValue, NullValue> graph = 
new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+                                       .setParallelism(little_parallelism)
                                        .generate();
 
+                               boolean clipAndFlip = 
parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+
                                if (scale > 32) {
                                        ji = graph
-                                               .run(new Simplify<LongValue, 
NullValue, NullValue>(clipAndFlip))
-                                               .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, 
NullValue>());
+                                               .run(new Simplify<LongValue, 
NullValue, NullValue>(clipAndFlip)
+                                                       
.setParallelism(little_parallelism))
+                                               .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, 
NullValue>()
+                                                       
.setLittleParallelism(little_parallelism));
                                } else {
                                        ji = graph
-                                               .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToIntValue()))
-                                               .run(new Simplify<IntValue, 
NullValue, NullValue>(clipAndFlip))
-                                               .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex<IntValue, NullValue, 
NullValue>());
+                                               .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToIntValue())
+                                                       
.setParallelism(little_parallelism))
+                                               .run(new Simplify<IntValue, 
NullValue, NullValue>(clipAndFlip)
+                                                       
.setParallelism(little_parallelism))
+                                               .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex<IntValue, NullValue, 
NullValue>()
+                                                       
.setLittleParallelism(little_parallelism));
                                }
                                } break;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
index 408516b..6f808f3 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
@@ -73,7 +73,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, 
Tuple3<EV, Degrees, Deg
 
                EdgeDegreesPair rhs = (EdgeDegreesPair) other;
 
-               parallelism = Math.min(parallelism, rhs.parallelism);
+               parallelism = (parallelism == PARALLELISM_DEFAULT) ? 
rhs.parallelism :
+                       ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism 
: Math.min(parallelism, rhs.parallelism));
 
                return true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
index e55e3c6..03fd1ba 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
@@ -72,7 +72,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, 
Tuple2<EV, Degrees>>> {
 
                EdgeSourceDegrees rhs = (EdgeSourceDegrees) other;
 
-               parallelism = Math.min(parallelism, rhs.parallelism);
+               parallelism = (parallelism == PARALLELISM_DEFAULT) ? 
rhs.parallelism :
+                       ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism 
: Math.min(parallelism, rhs.parallelism));
 
                return true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
index ed48f98..7526d00 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
@@ -72,7 +72,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, 
Tuple2<EV, Degrees>>> {
 
                EdgeTargetDegrees rhs = (EdgeTargetDegrees) other;
 
-               parallelism = Math.min(parallelism, rhs.parallelism);
+               parallelism = (parallelism == PARALLELISM_DEFAULT) ? 
rhs.parallelism :
+                       ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism 
: Math.min(parallelism, rhs.parallelism));
 
                return true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
index f4d734e..a27ca29 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
@@ -108,7 +108,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
Degrees>> {
                // merge configurations
 
                
includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
-               parallelism = Math.min(parallelism, rhs.parallelism);
+               parallelism = (parallelism == PARALLELISM_DEFAULT) ? 
rhs.parallelism :
+                       ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism 
: Math.min(parallelism, rhs.parallelism));
 
                return true;
        }
@@ -141,7 +142,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
Degrees>> {
                                .equalTo(0)
                                .with(new JoinVertexWithVertexDegrees<K, VV>())
                                        .setParallelism(parallelism)
-                                       .name("Join zero degree vertices");
+                                       .name("Zero degree vertices");
                }
 
                return vertexDegrees;

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
index 3f842a6..934c4ed 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
@@ -101,7 +101,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
LongValue>> {
                // merge configurations
 
                
includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
-               parallelism = Math.min(parallelism, rhs.parallelism);
+               parallelism = (parallelism == PARALLELISM_DEFAULT) ? 
rhs.parallelism :
+                       ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism 
: Math.min(parallelism, rhs.parallelism));
 
                return true;
        }
@@ -114,7 +115,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
LongValue>> {
                        .getEdges()
                        .map(new MapEdgeToTargetId<K, EV>())
                                .setParallelism(parallelism)
-                               .name("Map edge to target ID");
+                               .name("Edge to target ID");
 
                // t, d(t)
                DataSet<Vertex<K, LongValue>> targetDegree = targetIds
@@ -131,7 +132,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
LongValue>> {
                                .equalTo(0)
                                .with(new JoinVertexWithVertexDegree<K, VV>())
                                        .setParallelism(parallelism)
-                                       .name("Join zero degree vertices");
+                                       .name("Zero degree vertices");
                }
 
                return targetDegree;

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
index 0ec4fc1..a8745ca 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
@@ -101,7 +101,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
LongValue>> {
                // merge configurations
 
                
includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
-               parallelism = Math.min(parallelism, rhs.parallelism);
+               parallelism = (parallelism == PARALLELISM_DEFAULT) ? 
rhs.parallelism :
+                       ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism 
: Math.min(parallelism, rhs.parallelism));
 
                return true;
        }
@@ -114,7 +115,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
LongValue>> {
                        .getEdges()
                        .map(new MapEdgeToSourceId<K, EV>())
                                .setParallelism(parallelism)
-                               .name("Map edge to source ID");
+                               .name("Edge to source ID");
 
                // s, d(s)
                DataSet<Vertex<K, LongValue>> sourceDegree = sourceIds
@@ -131,7 +132,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
LongValue>> {
                                .equalTo(0)
                                .with(new JoinVertexWithVertexDegree<K, VV>())
                                        .setParallelism(parallelism)
-                                       .name("Join zero degree vertices");
+                                       .name("Zero degree vertices");
                }
 
                return sourceDegree;

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
index 09ef975..71b4891 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
@@ -95,7 +95,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, 
Tuple3<EV, LongValue, L
                EdgeDegreePair rhs = (EdgeDegreePair) other;
 
                reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
-               parallelism = Math.min(parallelism, rhs.parallelism);
+               parallelism = (parallelism == PARALLELISM_DEFAULT) ? 
rhs.parallelism :
+                       ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism 
: Math.min(parallelism, rhs.parallelism));
 
                return true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
index 702fead..ee9a144 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
@@ -93,7 +93,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, 
Tuple2<EV, LongValue>>>
                EdgeSourceDegree rhs = (EdgeSourceDegree) other;
 
                reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
-               parallelism = Math.min(parallelism, rhs.parallelism);
+               parallelism = (parallelism == PARALLELISM_DEFAULT) ? 
rhs.parallelism :
+                       ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism 
: Math.min(parallelism, rhs.parallelism));
 
                return true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
index 724567e..1255d86 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
@@ -93,7 +93,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, 
Tuple2<EV, LongValue>>>
                EdgeTargetDegree rhs = (EdgeTargetDegree) other;
 
                reduceOnSourceId.mergeWith(rhs.reduceOnSourceId);
-               parallelism = Math.min(parallelism, rhs.parallelism);
+               parallelism = (parallelism == PARALLELISM_DEFAULT) ? 
rhs.parallelism :
+                       ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism 
: Math.min(parallelism, rhs.parallelism));
 
                return true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
index 0f753fc..f466f85 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
@@ -122,7 +122,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
LongValue>> {
 
                
includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
                reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
-               parallelism = Math.min(parallelism, rhs.parallelism);
+               parallelism = (parallelism == PARALLELISM_DEFAULT) ? 
rhs.parallelism :
+                       ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism 
: Math.min(parallelism, rhs.parallelism));
 
                return true;
        }
@@ -138,7 +139,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
LongValue>> {
                        .getEdges()
                        .map(mapEdgeToId)
                                .setParallelism(parallelism)
-                               .name("Map edge to vertex ID");
+                               .name("Edge to vertex ID");
 
                // v, deg(v)
                DataSet<Vertex<K, LongValue>> degree = vertexIds
@@ -156,7 +157,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
LongValue>> {
                                .equalTo(0)
                                .with(new JoinVertexWithVertexDegree<K, VV>())
                                        .setParallelism(parallelism)
-                                       .name("Join zero degree vertices");
+                                       .name("Zero degree vertices");
                }
 
                return degree;

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
index be19ffd..e5eea61 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
@@ -139,7 +139,8 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 
                reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
                
broadcastHighDegreeVertices.mergeWith(rhs.broadcastHighDegreeVertices);
-               parallelism = Math.min(parallelism, rhs.parallelism);
+               parallelism = (parallelism == PARALLELISM_DEFAULT) ? 
rhs.parallelism :
+                       ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism 
: Math.min(parallelism, rhs.parallelism));
 
                return true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
index 99ffe0d..3d1fcee 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
@@ -71,7 +71,8 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 
                Simplify rhs = (Simplify) other;
 
-               parallelism = Math.min(parallelism, rhs.parallelism);
+               parallelism = (parallelism == PARALLELISM_DEFAULT) ? 
rhs.parallelism :
+                       ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism 
: Math.min(parallelism, rhs.parallelism));
 
                return true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
index 45cd3f9..c3d8983 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
@@ -97,7 +97,8 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 
                // merge configurations
 
-               parallelism = Math.min(parallelism, rhs.parallelism);
+               parallelism = (parallelism == PARALLELISM_DEFAULT) ? 
rhs.parallelism :
+                       ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism 
: Math.min(parallelism, rhs.parallelism));
 
                return true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
index bde826e..b2b7594 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
@@ -93,7 +93,8 @@ extends GraphAlgorithmWrappingGraph<K, VV, OLD, K, VV, NEW> {
 
                // merge configurations
 
-               parallelism = Math.min(parallelism, rhs.parallelism);
+               parallelism = (parallelism == PARALLELISM_DEFAULT) ? 
rhs.parallelism :
+                       ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism 
: Math.min(parallelism, rhs.parallelism));
 
                return true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
index 2c67c5a..e079a41 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
@@ -95,7 +95,8 @@ extends GraphAlgorithmWrappingGraph<OLD, VV, EV, NEW, VV, EV> 
{
 
                // merge configurations
 
-               parallelism = Math.min(parallelism, rhs.parallelism);
+               parallelism = (parallelism == PARALLELISM_DEFAULT) ? 
rhs.parallelism :
+                       ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism 
: Math.min(parallelism, rhs.parallelism));
 
                return true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
index 9e6784e..7447e11 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
@@ -93,7 +93,8 @@ extends GraphAlgorithmWrappingGraph<K, OLD, EV, K, NEW, EV> {
 
                // merge configurations
 
-               parallelism = Math.min(parallelism, rhs.parallelism);
+               parallelism = (parallelism == PARALLELISM_DEFAULT) ? 
rhs.parallelism :
+                       ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism 
: Math.min(parallelism, rhs.parallelism));
 
                return true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
index 9d323a8..608500b 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
@@ -118,7 +118,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
 
                
includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
 
-               littleParallelism = Math.min(littleParallelism, 
rhs.littleParallelism);
+               littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) 
? rhs.littleParallelism :
+                       ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? 
littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
 
                return true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
index 7df288a..e1b3040 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -113,7 +113,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                TriangleListing rhs = (TriangleListing) other;
 
                sortTriangleVertices.mergeWith(rhs.sortTriangleVertices);
-               littleParallelism = Math.min(littleParallelism, 
rhs.littleParallelism);
+               littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) 
? rhs.littleParallelism :
+                       ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? 
littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
 
                return true;
        }
@@ -162,7 +163,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                        .groupBy(0)
                        .sortGroup(1, Order.ASCENDING)
                        .reduceGroup(new GenerateTriplets<K>())
-                               .setParallelism(littleParallelism)
                                .name("Generate triplets");
 
                // u, v, w, bitmask where (u, v), (u, w), and (v, w) are edges 
in graph
@@ -171,7 +171,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                        .where(1, 2)
                        .equalTo(0, 1)
                        .with(new ProjectTriangles<K>())
-                               .setParallelism(littleParallelism)
                                .name("Triangle listing");
 
                if (sortTriangleVertices.get()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index 293e3f9..3621156 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -118,7 +118,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                // merge configurations
 
                
includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
-               littleParallelism = Math.min(littleParallelism, 
rhs.littleParallelism);
+               littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) 
? rhs.littleParallelism :
+                       ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? 
littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
 
                return true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
index c3dbf3e..8850125 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
@@ -114,7 +114,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Tuple3<K, 
K, K>> {
                TriangleListing rhs = (TriangleListing) other;
 
                sortTriangleVertices.mergeWith(rhs.sortTriangleVertices);
-               littleParallelism = Math.min(littleParallelism, 
rhs.littleParallelism);
+               littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) 
? rhs.littleParallelism :
+                       ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? 
littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
 
                return true;
        }
@@ -155,7 +156,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Tuple3<K, 
K, K>> {
                        .groupBy(0)
                        .sortGroup(1, Order.ASCENDING)
                        .reduceGroup(new GenerateTriplets<K>())
-                               .setParallelism(littleParallelism)
                                .name("Generate triplets");
 
                // u, v, w where (u, v), (u, w), and (v, w) are edges in graph, 
v < w
@@ -164,7 +164,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Tuple3<K, 
K, K>> {
                        .where(1, 2)
                        .equalTo(0, 1)
                        .with(new ProjectTriangles<K>())
-                               .setParallelism(littleParallelism)
                                .name("Triangle listing");
 
                if (sortTriangleVertices.get()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
index 7ba6fee..9e3511c 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
@@ -148,7 +148,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
 
                maxIterations = Math.max(maxIterations, rhs.maxIterations);
                convergenceThreshold = Math.min(convergenceThreshold, 
rhs.convergenceThreshold);
-               parallelism = Math.min(parallelism, rhs.parallelism);
+               parallelism = (parallelism == PARALLELISM_DEFAULT) ? 
rhs.parallelism :
+                       ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism 
: Math.min(parallelism, rhs.parallelism));
 
                return true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
index 1514866..00819e4 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
@@ -151,7 +151,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
 
                // merge configurations
 
-               littleParallelism = Math.min(littleParallelism, 
rhs.littleParallelism);
+               littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) 
? rhs.littleParallelism :
+                       ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? 
littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
 
                return true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
index 1e406fa..148d541 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
@@ -181,7 +181,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                // merge configurations
 
                groupSize = Math.max(groupSize, rhs.groupSize);
-               littleParallelism = Math.min(littleParallelism, 
rhs.littleParallelism);
+               littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) 
? rhs.littleParallelism :
+                       ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? 
littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
 
                return true;
        }

Reply via email to