[FLINK-4571] [gelly] Configurable little parallelism in Gelly drivers This closes #2475
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bdd3c0d9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bdd3c0d9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bdd3c0d9 Branch: refs/heads/master Commit: bdd3c0d94b2a6cdecb482ee3fdefe082fc1b7c4d Parents: 8210ff4 Author: Greg Hogan <[email protected]> Authored: Fri Sep 2 11:53:08 2016 -0400 Committer: Greg Hogan <[email protected]> Committed: Thu Sep 8 17:06:29 2016 -0400 ---------------------------------------------------------------------- .../graph/examples/ClusteringCoefficient.java | 72 ++++++++++++++------ .../flink/graph/examples/JaccardIndex.java | 30 +++++--- .../annotate/directed/EdgeDegreesPair.java | 3 +- .../annotate/directed/EdgeSourceDegrees.java | 3 +- .../annotate/directed/EdgeTargetDegrees.java | 3 +- .../degree/annotate/directed/VertexDegrees.java | 5 +- .../annotate/directed/VertexInDegree.java | 7 +- .../annotate/directed/VertexOutDegree.java | 7 +- .../annotate/undirected/EdgeDegreePair.java | 3 +- .../annotate/undirected/EdgeSourceDegree.java | 3 +- .../annotate/undirected/EdgeTargetDegree.java | 3 +- .../annotate/undirected/VertexDegree.java | 7 +- .../degree/filter/undirected/MaximumDegree.java | 3 +- .../graph/asm/simple/directed/Simplify.java | 3 +- .../graph/asm/simple/undirected/Simplify.java | 3 +- .../asm/translate/TranslateEdgeValues.java | 3 +- .../graph/asm/translate/TranslateGraphIds.java | 3 +- .../asm/translate/TranslateVertexValues.java | 3 +- .../directed/LocalClusteringCoefficient.java | 3 +- .../clustering/directed/TriangleListing.java | 5 +- .../undirected/LocalClusteringCoefficient.java | 3 +- .../clustering/undirected/TriangleListing.java | 5 +- .../flink/graph/library/link_analysis/HITS.java | 3 +- .../graph/library/similarity/AdamicAdar.java | 3 +- .../graph/library/similarity/JaccardIndex.java | 3 +- 25 files changed, 124 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java index e099e2b..f4b1ecf 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java @@ -42,6 +42,8 @@ import org.apache.flink.types.StringValue; import java.text.NumberFormat; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + /** * Driver for the library implementations of Global and Local Clustering Coefficient. * @@ -89,12 +91,15 @@ public class ClusteringCoefficient { env.getConfig().enableObjectReuse(); ParameterTool parameters = ParameterTool.fromArgs(args); + if (! parameters.has("directed")) { printUsage(); return; } boolean directedAlgorithm = parameters.getBoolean("directed"); + int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT); + // global and local clustering coefficient results GraphAnalytic gcc; DataSet lcc; @@ -120,14 +125,18 @@ public class ClusteringCoefficient { if (directedAlgorithm) { gcc = graph - .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); lcc = graph - .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); } else { gcc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); lcc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); } } break; @@ -137,14 +146,18 @@ public class ClusteringCoefficient { if (directedAlgorithm) { gcc = graph - .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); lcc = graph - .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<StringValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); } else { gcc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); lcc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<StringValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); } } break; @@ -164,51 +177,66 @@ public class ClusteringCoefficient { long edgeCount = vertexCount * edgeFactor; Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) + .setParallelism(little_parallelism) .generate(); if (directedAlgorithm) { if (scale > 32) { Graph<LongValue, NullValue, NullValue> newGraph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>() + .setParallelism(little_parallelism)); gcc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); lcc = newGraph .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>() - .setIncludeZeroDegreeVertices(false)); + .setIncludeZeroDegreeVertices(false) + .setLittleParallelism(little_parallelism)); } else { Graph<IntValue, NullValue, NullValue> newGraph = graph - .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue())) - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>()); + .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()) + .setParallelism(little_parallelism)) + .run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>() + .setParallelism(little_parallelism)); gcc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); lcc = newGraph .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue, NullValue, NullValue>() - .setIncludeZeroDegreeVertices(false)); + .setIncludeZeroDegreeVertices(false) + .setLittleParallelism(little_parallelism)); } } else { boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); if (scale > 32) { Graph<LongValue, NullValue, NullValue> newGraph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip)); + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip) + .setParallelism(little_parallelism)); gcc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); lcc = newGraph .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>() - .setIncludeZeroDegreeVertices(false)); + .setIncludeZeroDegreeVertices(false) + .setLittleParallelism(little_parallelism)); } else { Graph<IntValue, NullValue, NullValue> newGraph = graph - .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue())) - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip)); + .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()) + .setParallelism(little_parallelism)) + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip) + .setParallelism(little_parallelism)); gcc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); lcc = newGraph .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>() - .setIncludeZeroDegreeVertices(false)); + .setIncludeZeroDegreeVertices(false) + .setLittleParallelism(little_parallelism)); } } } break; http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java index 824aab7..96f66ab 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java @@ -43,6 +43,8 @@ import org.apache.flink.types.StringValue; import java.text.NumberFormat; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + /** * Driver for the library implementation of Jaccard Index. * @@ -87,6 +89,8 @@ public class JaccardIndex { ParameterTool parameters = ParameterTool.fromArgs(args); + int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT); + DataSet ji; switch (parameters.get("input", "")) { @@ -107,13 +111,15 @@ public class JaccardIndex { case "integer": { ji = reader .keyType(LongValue.class) - .run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); } break; case "string": { ji = reader .keyType(StringValue.class) - .run(new org.apache.flink.graph.library.similarity.JaccardIndex<StringValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.library.similarity.JaccardIndex<StringValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); } break; default: @@ -131,20 +137,26 @@ public class JaccardIndex { long vertexCount = 1L << scale; long edgeCount = vertexCount * edgeFactor; - boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); - Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) + .setParallelism(little_parallelism) .generate(); + boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); + if (scale > 32) { ji = graph - .run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip)) - .run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>()); + .run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip) + .setParallelism(little_parallelism)) + .run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); } else { ji = graph - .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue())) - .run(new Simplify<IntValue, NullValue, NullValue>(clipAndFlip)) - .run(new org.apache.flink.graph.library.similarity.JaccardIndex<IntValue, NullValue, NullValue>()); + .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()) + .setParallelism(little_parallelism)) + .run(new Simplify<IntValue, NullValue, NullValue>(clipAndFlip) + .setParallelism(little_parallelism)) + .run(new org.apache.flink.graph.library.similarity.JaccardIndex<IntValue, NullValue, NullValue>() + .setLittleParallelism(little_parallelism)); } } break; http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java index 408516b..6f808f3 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java @@ -73,7 +73,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, Degrees, Deg EdgeDegreesPair rhs = (EdgeDegreesPair) other; - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java index e55e3c6..03fd1ba 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java @@ -72,7 +72,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> { EdgeSourceDegrees rhs = (EdgeSourceDegrees) other; - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java index ed48f98..7526d00 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java @@ -72,7 +72,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> { EdgeTargetDegrees rhs = (EdgeTargetDegrees) other; - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java index f4d734e..a27ca29 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java @@ -108,7 +108,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> { // merge configurations includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices); - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } @@ -141,7 +142,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> { .equalTo(0) .with(new JoinVertexWithVertexDegrees<K, VV>()) .setParallelism(parallelism) - .name("Join zero degree vertices"); + .name("Zero degree vertices"); } return vertexDegrees; http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java index 3f842a6..934c4ed 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java @@ -101,7 +101,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> { // merge configurations includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices); - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } @@ -114,7 +115,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> { .getEdges() .map(new MapEdgeToTargetId<K, EV>()) .setParallelism(parallelism) - .name("Map edge to target ID"); + .name("Edge to target ID"); // t, d(t) DataSet<Vertex<K, LongValue>> targetDegree = targetIds @@ -131,7 +132,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> { .equalTo(0) .with(new JoinVertexWithVertexDegree<K, VV>()) .setParallelism(parallelism) - .name("Join zero degree vertices"); + .name("Zero degree vertices"); } return targetDegree; http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java index 0ec4fc1..a8745ca 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java @@ -101,7 +101,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> { // merge configurations includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices); - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } @@ -114,7 +115,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> { .getEdges() .map(new MapEdgeToSourceId<K, EV>()) .setParallelism(parallelism) - .name("Map edge to source ID"); + .name("Edge to source ID"); // s, d(s) DataSet<Vertex<K, LongValue>> sourceDegree = sourceIds @@ -131,7 +132,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> { .equalTo(0) .with(new JoinVertexWithVertexDegree<K, VV>()) .setParallelism(parallelism) - .name("Join zero degree vertices"); + .name("Zero degree vertices"); } return sourceDegree; http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java index 09ef975..71b4891 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java @@ -95,7 +95,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, LongValue, L EdgeDegreePair rhs = (EdgeDegreePair) other; reduceOnTargetId.mergeWith(rhs.reduceOnTargetId); - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java index 702fead..ee9a144 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java @@ -93,7 +93,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>> EdgeSourceDegree rhs = (EdgeSourceDegree) other; reduceOnTargetId.mergeWith(rhs.reduceOnTargetId); - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java index 724567e..1255d86 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java @@ -93,7 +93,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>> EdgeTargetDegree rhs = (EdgeTargetDegree) other; reduceOnSourceId.mergeWith(rhs.reduceOnSourceId); - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java index 0f753fc..f466f85 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java @@ -122,7 +122,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> { includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices); reduceOnTargetId.mergeWith(rhs.reduceOnTargetId); - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } @@ -138,7 +139,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> { .getEdges() .map(mapEdgeToId) .setParallelism(parallelism) - .name("Map edge to vertex ID"); + .name("Edge to vertex ID"); // v, deg(v) DataSet<Vertex<K, LongValue>> degree = vertexIds @@ -156,7 +157,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> { .equalTo(0) .with(new JoinVertexWithVertexDegree<K, VV>()) .setParallelism(parallelism) - .name("Join zero degree vertices"); + .name("Zero degree vertices"); } return degree; http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java index be19ffd..e5eea61 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java @@ -139,7 +139,8 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> { reduceOnTargetId.mergeWith(rhs.reduceOnTargetId); broadcastHighDegreeVertices.mergeWith(rhs.broadcastHighDegreeVertices); - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java index 99ffe0d..3d1fcee 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java @@ -71,7 +71,8 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> { Simplify rhs = (Simplify) other; - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java index 45cd3f9..c3d8983 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java @@ -97,7 +97,8 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> { // merge configurations - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java index bde826e..b2b7594 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java @@ -93,7 +93,8 @@ extends GraphAlgorithmWrappingGraph<K, VV, OLD, K, VV, NEW> { // merge configurations - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java index 2c67c5a..e079a41 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java @@ -95,7 +95,8 @@ extends GraphAlgorithmWrappingGraph<OLD, VV, EV, NEW, VV, EV> { // merge configurations - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java index 9e6784e..7447e11 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java @@ -93,7 +93,8 @@ extends GraphAlgorithmWrappingGraph<K, OLD, EV, K, NEW, EV> { // merge configurations - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java index 9d323a8..608500b 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java @@ -118,7 +118,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices); - littleParallelism = Math.min(littleParallelism, rhs.littleParallelism); + littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism : + ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism)); return true; } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java index 7df288a..e1b3040 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java @@ -113,7 +113,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { TriangleListing rhs = (TriangleListing) other; sortTriangleVertices.mergeWith(rhs.sortTriangleVertices); - littleParallelism = Math.min(littleParallelism, rhs.littleParallelism); + littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism : + ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism)); return true; } @@ -162,7 +163,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { .groupBy(0) .sortGroup(1, Order.ASCENDING) .reduceGroup(new GenerateTriplets<K>()) - .setParallelism(littleParallelism) .name("Generate triplets"); // u, v, w, bitmask where (u, v), (u, w), and (v, w) are edges in graph @@ -171,7 +171,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { .where(1, 2) .equalTo(0, 1) .with(new ProjectTriangles<K>()) - .setParallelism(littleParallelism) .name("Triangle listing"); if (sortTriangleVertices.get()) { http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java index 293e3f9..3621156 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java @@ -118,7 +118,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { // merge configurations includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices); - littleParallelism = Math.min(littleParallelism, rhs.littleParallelism); + littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism : + ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism)); return true; } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java index c3dbf3e..8850125 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java @@ -114,7 +114,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Tuple3<K, K, K>> { TriangleListing rhs = (TriangleListing) other; sortTriangleVertices.mergeWith(rhs.sortTriangleVertices); - littleParallelism = Math.min(littleParallelism, rhs.littleParallelism); + littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism : + ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism)); return true; } @@ -155,7 +156,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Tuple3<K, K, K>> { .groupBy(0) .sortGroup(1, Order.ASCENDING) .reduceGroup(new GenerateTriplets<K>()) - .setParallelism(littleParallelism) .name("Generate triplets"); // u, v, w where (u, v), (u, w), and (v, w) are edges in graph, v < w @@ -164,7 +164,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Tuple3<K, K, K>> { .where(1, 2) .equalTo(0, 1) .with(new ProjectTriangles<K>()) - .setParallelism(littleParallelism) .name("Triangle listing"); if (sortTriangleVertices.get()) { http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java index 7ba6fee..9e3511c 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java @@ -148,7 +148,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { maxIterations = Math.max(maxIterations, rhs.maxIterations); convergenceThreshold = Math.min(convergenceThreshold, rhs.convergenceThreshold); - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java index 1514866..00819e4 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java @@ -151,7 +151,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { // merge configurations - littleParallelism = Math.min(littleParallelism, rhs.littleParallelism); + littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism : + ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism)); return true; } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java index 1e406fa..148d541 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java @@ -181,7 +181,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { // merge configurations groupSize = Math.max(groupSize, rhs.groupSize); - littleParallelism = Math.min(littleParallelism, rhs.littleParallelism); + littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism : + ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism)); return true; }
