Repository: flink
Updated Branches:
  refs/heads/master fb1ef0812 -> 59cf7032f


[hotfix] [gelly] Driver updates

- refactor SimpleDriver to call internal plan method
- add CLI parameters for RMatGraph, AdamicAdar, JaccardIndex
- remove unused data from VertexDegrees
- JaccardIndex now filters on > rather than >=
- handle null in ValueArrayTypeInfo
- add NonForwardingIdentityMapper to GraphUtils


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59cf7032
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59cf7032
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59cf7032

Branch: refs/heads/master
Commit: 59cf7032f3de35ba461d22411fa45f0286c62981
Parents: fb1ef08
Author: Greg Hogan <[email protected]>
Authored: Thu Apr 13 14:07:29 2017 -0400
Committer: Greg Hogan <[email protected]>
Committed: Wed Apr 19 11:44:55 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/flink/graph/Runner.java     | 14 +++--
 .../apache/flink/graph/drivers/AdamicAdar.java  | 20 +++++--
 .../graph/drivers/ClusteringCoefficient.java    | 30 ++++++----
 .../graph/drivers/ConnectedComponents.java      |  7 +--
 .../apache/flink/graph/drivers/EdgeList.java    |  5 +-
 .../org/apache/flink/graph/drivers/HITS.java    |  9 +--
 .../flink/graph/drivers/JaccardIndex.java       | 27 +++++++--
 .../apache/flink/graph/drivers/PageRank.java    |  9 +--
 .../flink/graph/drivers/SimpleDriver.java       | 63 +++++++++++++++++---
 .../flink/graph/drivers/TriangleListing.java    | 32 ++++++----
 .../flink/graph/drivers/input/RMatGraph.java    |  3 +
 .../degree/annotate/directed/VertexDegrees.java | 30 +++++-----
 .../graph/library/link_analysis/Functions.java  |  4 +-
 .../graph/library/link_analysis/PageRank.java   |  5 ++
 .../graph/library/similarity/JaccardIndex.java  |  7 ++-
 .../types/valuearray/ValueArrayTypeInfo.java    | 13 +++-
 .../valuearray/ValueArrayTypeInfoFactory.java   |  5 +-
 .../apache/flink/graph/utils/GraphUtils.java    | 27 ++++++++-
 .../library/similarity/JaccardIndexTest.java    |  2 +
 19 files changed, 224 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
index 4b6cf42..5ffe681 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph;
 
 import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.utils.ParameterTool;
@@ -188,20 +189,21 @@ public class Runner {
        public static void main(String[] args) throws Exception {
                // Set up the execution environment
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               ExecutionConfig config = env.getConfig();
 
                // should not have any non-Flink data types
-               env.getConfig().disableAutoTypeRegistration();
-               env.getConfig().disableForceAvro();
-               env.getConfig().disableForceKryo();
+               config.disableAutoTypeRegistration();
+               config.disableForceAvro();
+               config.disableForceKryo();
 
                ParameterTool parameters = ParameterTool.fromArgs(args);
-               env.getConfig().setGlobalJobParameters(parameters);
+               config.setGlobalJobParameters(parameters);
 
                // integration tests run with with object reuse both disabled 
and enabled
                if (parameters.has("__disable_object_reuse")) {
-                       env.getConfig().disableObjectReuse();
+                       config.disableObjectReuse();
                } else {
-                       env.getConfig().enableObjectReuse();
+                       config.enableObjectReuse();
                }
 
                // Usage

http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
index 742c1de..8bf9268 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
@@ -20,9 +20,11 @@ package org.apache.flink.graph.drivers;
 
 import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.commons.lang3.text.WordUtils;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.drivers.output.CSV;
 import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.DoubleParameter;
 import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.graph.library.similarity.AdamicAdar.Result;
 import org.apache.flink.types.CopyableValue;
@@ -33,8 +35,16 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * Driver for {@link org.apache.flink.graph.library.similarity.AdamicAdar}.
  */
 public class AdamicAdar<K extends CopyableValue<K>, VV, EV>
-extends SimpleDriver<Result<K>>
-implements Driver<K, VV, EV>, CSV, Print {
+extends SimpleDriver<K, VV, EV, Result<K>>
+implements CSV, Print {
+
+       private DoubleParameter minRatio = new DoubleParameter(this, 
"minimum_ratio")
+               .setDefaultValue(0.0)
+               .setMinimumValue(0.0, true);
+
+       private DoubleParameter minScore = new DoubleParameter(this, 
"minimum_score")
+               .setDefaultValue(0.0)
+               .setMinimumValue(0.0, true);
 
        private LongParameter littleParallelism = new LongParameter(this, 
"little_parallelism")
                .setDefaultValue(PARALLELISM_DEFAULT);
@@ -61,11 +71,13 @@ implements Driver<K, VV, EV>, CSV, Print {
        }
 
        @Override
-       public void plan(Graph<K, VV, EV> graph) throws Exception {
+       protected DataSet<Result<K>> simplePlan(Graph<K, VV, EV> graph) throws 
Exception {
                int lp = littleParallelism.getValue().intValue();
 
-               result = graph
+               return graph
                        .run(new 
org.apache.flink.graph.library.similarity.AdamicAdar<K, VV, EV>()
+                               
.setMinimumRatio(minRatio.getValue().floatValue())
+                               
.setMinimumScore(minScore.getValue().floatValue())
                                .setLittleParallelism(lp));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
index c463c0a..4958b5a 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.drivers;
 
 import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.commons.lang3.text.WordUtils;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAnalytic;
 import org.apache.flink.graph.asm.result.PrintableResult;
@@ -43,8 +44,8 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @see 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient
  */
 public class ClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, 
VV, EV>
-extends SimpleDriver<PrintableResult>
-implements Driver<K, VV, EV>, CSV, Hash, Print {
+extends SimpleDriver<K, VV, EV, PrintableResult>
+implements CSV, Hash, Print {
 
        private static final String DIRECTED = "directed";
 
@@ -85,15 +86,11 @@ implements Driver<K, VV, EV>, CSV, Hash, Print {
        }
 
        @Override
-       public void plan(Graph<K, VV, EV> graph) throws Exception {
+       protected DataSet<PrintableResult> simplePlan(Graph<K, VV, EV> graph) 
throws Exception {
                int lp = littleParallelism.getValue().intValue();
 
                switch (order.getValue()) {
                        case DIRECTED:
-                               result = graph
-                                       .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<K,
 VV, EV>()
-                                               .setLittleParallelism(lp));
-
                                globalClusteringCoefficient = graph
                                        .run(new 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<K,
 VV, EV>()
                                                .setLittleParallelism(lp));
@@ -101,13 +98,14 @@ implements Driver<K, VV, EV>, CSV, Hash, Print {
                                averageClusteringCoefficient = graph
                                        .run(new 
org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<K,
 VV, EV>()
                                                .setLittleParallelism(lp));
-                               break;
 
-                       case UNDIRECTED:
-                               result = graph
-                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<K,
 VV, EV>()
+                               @SuppressWarnings("unchecked")
+                               DataSet<PrintableResult> directedResult = 
(DataSet<PrintableResult>) (DataSet<?>) graph
+                                       .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<K,
 VV, EV>()
                                                .setLittleParallelism(lp));
+                               return directedResult;
 
+                       case UNDIRECTED:
                                globalClusteringCoefficient = graph
                                        .run(new 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<K,
 VV, EV>()
                                                .setLittleParallelism(lp));
@@ -115,7 +113,15 @@ implements Driver<K, VV, EV>, CSV, Hash, Print {
                                averageClusteringCoefficient = graph
                                        .run(new 
org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<K,
 VV, EV>()
                                                .setLittleParallelism(lp));
-                               break;
+
+                               @SuppressWarnings("unchecked")
+                               DataSet<PrintableResult> undirectedResult = 
(DataSet<PrintableResult>) (DataSet<?>) graph
+                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<K,
 VV, EV>()
+                                               .setLittleParallelism(lp));
+                               return undirectedResult;
+
+                       default:
+                               throw new RuntimeException("Unknown order: " + 
order);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java
index 32263cf..95904d8 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java
@@ -78,12 +78,9 @@ implements Driver<K, VV, EV>, CSV, Hash, Print {
 
        @Override
        public void print(String executionName) throws Exception {
-               Collect<Vertex<K, K>> collector = new Collect<>();
+               List<Vertex<K, K>> results = new Collect<Vertex<K, 
K>>().run(components).execute(executionName);
 
-               // Refactored due to openjdk7 compile error: 
https://travis-ci.org/greghogan/flink/builds/200487761
-               List<Vertex<K, K>> records = 
collector.run(components).execute(executionName);
-
-               for (Vertex<K, K> result : records) {
+               for (Vertex<K, K> result : results) {
                        System.out.println(result);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
index 524e70f..5da0284 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
@@ -82,10 +82,7 @@ implements Driver<K, VV, EV>, CSV, Hash, Print {
 
        @Override
        public void print(String executionName) throws Exception {
-               Collect<Edge<K, EV>> collector = new Collect<>();
-
-               // Refactored due to openjdk7 compile error: 
https://travis-ci.org/greghogan/flink/builds/200487761
-               List<Edge<K, EV>> records = 
collector.run(edges).execute(executionName);
+               List<Edge<K, EV>> records = new Collect<Edge<K, 
EV>>().run(edges).execute(executionName);
 
                if (hasNullValueEdges(edges)) {
                        for (Edge<K, EV> result : records) {

http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
index 6081fea..209cddf 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.drivers;
 
 import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.commons.lang3.text.WordUtils;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.drivers.output.CSV;
 import org.apache.flink.graph.drivers.output.Print;
@@ -30,8 +31,8 @@ import 
org.apache.flink.graph.library.link_analysis.HITS.Result;
  * Driver for {@link org.apache.flink.graph.library.link_analysis.HITS}.
  */
 public class HITS<K, VV, EV>
-extends SimpleDriver<Result<K>>
-implements Driver<K, VV, EV>, CSV, Print {
+extends SimpleDriver<K, VV, EV, Result<K>>
+implements CSV, Print {
 
        private static final int DEFAULT_ITERATIONS = 10;
 
@@ -59,8 +60,8 @@ implements Driver<K, VV, EV>, CSV, Print {
        }
 
        @Override
-       public void plan(Graph<K, VV, EV> graph) throws Exception {
-               result = graph
+       protected DataSet<Result<K>> simplePlan(Graph<K, VV, EV> graph) throws 
Exception {
+               return graph
                        .run(new 
org.apache.flink.graph.library.link_analysis.HITS<K, VV, EV>(
                                iterationConvergence.getValue().iterations,
                                
iterationConvergence.getValue().convergenceThreshold));

http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
index 1c836ea..ae0d5f8 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.drivers;
 
 import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.commons.lang3.text.WordUtils;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.drivers.output.CSV;
 import org.apache.flink.graph.drivers.output.Hash;
@@ -34,8 +35,24 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * Driver for {@link org.apache.flink.graph.library.similarity.JaccardIndex}.
  */
 public class JaccardIndex<K extends CopyableValue<K>, VV, EV>
-extends SimpleDriver<Result<K>>
-implements Driver<K, VV, EV>, CSV, Hash, Print {
+extends SimpleDriver<K, VV, EV, Result<K>>
+implements CSV, Hash, Print {
+
+       private LongParameter minNumerator = new LongParameter(this, 
"minimum_numerator")
+               .setDefaultValue(0)
+               .setMinimumValue(0);
+
+       private LongParameter minDenominator = new LongParameter(this, 
"minimum_denominator")
+               .setDefaultValue(1)
+               .setMinimumValue(1);
+
+       private LongParameter maxNumerator = new LongParameter(this, 
"maximum_numerator")
+               .setDefaultValue(1)
+               .setMinimumValue(0);
+
+       private LongParameter maxDenominator = new LongParameter(this, 
"maximum_denominator")
+               .setDefaultValue(1)
+               .setMinimumValue(1);
 
        private LongParameter littleParallelism = new LongParameter(this, 
"little_parallelism")
                .setDefaultValue(PARALLELISM_DEFAULT);
@@ -64,11 +81,13 @@ implements Driver<K, VV, EV>, CSV, Hash, Print {
        }
 
        @Override
-       public void plan(Graph<K, VV, EV> graph) throws Exception {
+       protected DataSet<Result<K>> simplePlan(Graph<K, VV, EV> graph) throws 
Exception {
                int lp = littleParallelism.getValue().intValue();
 
-               result = graph
+               return graph
                        .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex<K, VV, EV>()
+                               
.setMinimumScore(minNumerator.getValue().intValue(), 
minDenominator.getValue().intValue())
+                               
.setMaximumScore(maxNumerator.getValue().intValue(), 
maxDenominator.getValue().intValue())
                                .setLittleParallelism(lp));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
index 8cef077..5d74bdb 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph.drivers;
 
 import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.drivers.output.CSV;
 import org.apache.flink.graph.drivers.output.Print;
@@ -30,8 +31,8 @@ import 
org.apache.flink.graph.library.link_analysis.PageRank.Result;
  * @see org.apache.flink.graph.library.link_analysis.PageRank
  */
 public class PageRank<K, VV, EV>
-extends SimpleDriver<Result<K>>
-implements Driver<K, VV, EV>, CSV, Print {
+extends SimpleDriver<K, VV, EV, Result<K>>
+implements CSV, Print {
 
        private static final int DEFAULT_ITERATIONS = 10;
 
@@ -64,8 +65,8 @@ implements Driver<K, VV, EV>, CSV, Print {
        }
 
        @Override
-       public void plan(Graph<K, VV, EV> graph) throws Exception {
-               result = graph
+       protected DataSet<Result<K>> simplePlan(Graph<K, VV, EV> graph) throws 
Exception {
+               return graph
                        .run(new 
org.apache.flink.graph.library.link_analysis.PageRank<K, VV, EV>(
                                dampingFactor.getValue(),
                                iterationConvergence.getValue().iterations,

http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java
index 98bdfc5..5cecca1 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph.drivers;
 
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
 import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.graph.asm.dataset.Collect;
@@ -33,30 +34,74 @@ import java.util.List;
  *
  * @param <R> algorithm's result type
  */
-public abstract class SimpleDriver<R extends PrintableResult>
-extends ParameterizedBase {
+public abstract class SimpleDriver<K, VV, EV, R extends PrintableResult>
+extends ParameterizedBase
+implements Driver<K, VV, EV> {
 
-       protected DataSet<? extends R> result;
+       private DataSet<R> result;
 
+       protected DataSet<R> getResult() {
+               return result;
+       }
+
+       /**
+        * Plan the algorithm and return the result {@link DataSet}.
+        *
+        * @param graph input graph
+        * @return driver output
+        * @throws Exception on error
+        */
+       protected abstract DataSet<R> simplePlan(Graph<K, VV, EV> graph) throws 
Exception;
+
+       @Override
+       public void plan(Graph<K, VV, EV> graph) throws Exception {
+               result = simplePlan(graph);
+       }
+
+       /**
+        * Print hash of execution results.
+        *
+        * Does *not* implement/override {@code Hash} since {@link Driver}
+        * implementations designate the appropriate outputs.
+        *
+        * @param executionName job name
+        * @throws Exception on error
+        */
        public void hash(String executionName) throws Exception {
                Checksum checksum = new ChecksumHashCode<R>()
-                       .run((DataSet<R>) result)
+                       .run(result)
                        .execute(executionName);
 
                System.out.println(checksum);
        }
 
+       /**
+        * Print execution results.
+        *
+        * Does *not* implement/override {@code Print} since {@link Driver}
+        * implementations designate the appropriate outputs.
+        *
+        * @param executionName job name
+        * @throws Exception on error
+        */
        public void print(String executionName) throws Exception {
-               Collect<R> collector = new Collect<>();
-
-               // Refactored due to openjdk7 compile error: 
https://travis-ci.org/greghogan/flink/builds/200487761
-               List<R> records = collector.run((DataSet<R>) 
result).execute(executionName);
+               List<R> results = new 
Collect<R>().run(result).execute(executionName);
 
-               for (R result : records) {
+               for (R result : results) {
                        System.out.println(result.toPrintableString());
                }
        }
 
+       /**
+        * Write execution results to file using CSV format.
+        *
+        * Does *not* implement/override {@code CSV} since {@link Driver}
+        * implementations designate the appropriate outputs.
+        *
+        * @param filename output filename
+        * @param lineDelimiter CSV delimiter between lines
+        * @param fieldDelimiter CSV delimiter between fields
+        */
        public void writeCSV(String filename, String lineDelimiter, String 
fieldDelimiter) {
                result
                        .writeAsCsv(filename, lineDelimiter, fieldDelimiter)

http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
index ca0c167..5157b8e 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.drivers;
 
 import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.commons.lang3.text.WordUtils;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAnalytic;
 import org.apache.flink.graph.asm.result.PrintableResult;
@@ -42,8 +43,8 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @see org.apache.flink.graph.library.clustering.undirected.TriadicCensus
  */
 public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, 
EV>
-extends SimpleDriver<PrintableResult>
-implements Driver<K, VV, EV>, CSV, Hash, Print {
+extends SimpleDriver<K, VV, EV, PrintableResult>
+implements CSV, Hash, Print {
 
        private static final String DIRECTED = "directed";
 
@@ -83,35 +84,40 @@ implements Driver<K, VV, EV>, CSV, Hash, Print {
        }
 
        @Override
-       public void plan(Graph<K, VV, EV> graph) throws Exception {
+       protected DataSet<PrintableResult> simplePlan(Graph<K, VV, EV> graph) 
throws Exception {
                int lp = littleParallelism.getValue().intValue();
 
                switch (order.getValue()) {
                        case DIRECTED:
-                               result = graph
-                                       .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<K, VV, EV>()
-                                               
.setSortTriangleVertices(sortTriangleVertices.getValue())
-                                               .setLittleParallelism(lp));
-
                                if (computeTriadicCensus.getValue()) {
                                        triadicCensus = graph
                                                .run(new 
org.apache.flink.graph.library.clustering.directed.TriadicCensus<K, VV, EV>()
                                                        
.setLittleParallelism(lp));
                                }
-                               break;
 
-                       case UNDIRECTED:
-                               result = graph
-                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<K, VV, 
EV>()
+                               @SuppressWarnings("unchecked")
+                               DataSet<PrintableResult> directedResult = 
(DataSet<PrintableResult>) (DataSet<?>) graph
+                                       .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<K, VV, EV>()
                                                
.setSortTriangleVertices(sortTriangleVertices.getValue())
                                                .setLittleParallelism(lp));
+                               return directedResult;
 
+                       case UNDIRECTED:
                                if (computeTriadicCensus.getValue()) {
                                        triadicCensus = graph
                                                .run(new 
org.apache.flink.graph.library.clustering.undirected.TriadicCensus<K, VV, EV>()
                                                        
.setLittleParallelism(lp));
                                }
-                               break;
+
+                               @SuppressWarnings("unchecked")
+                               DataSet<PrintableResult> undirectedResult = 
(DataSet<PrintableResult>) (DataSet<?>) graph
+                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<K, VV, 
EV>()
+                                               
.setSortTriangleVertices(sortTriangleVertices.getValue())
+                                               .setLittleParallelism(lp));
+                               return undirectedResult;
+
+                       default:
+                               throw new RuntimeException("Unknown order: " + 
order);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
index e4e6a4c..d64534b 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
@@ -96,6 +96,9 @@ implements Input<K, NullValue, NullValue> {
                .setMinimumValue(0.0, true)
                .setMaximumValue(2.0, true);
 
+       private LongParameter seed = new LongParameter(this, "seed")
+               .setDefaultValue(JDKRandomGeneratorFactory.DEFAULT_SEED);
+
        private Simplify simplify = new Simplify(this);
 
        private LongParameter littleParallelism = new LongParameter(this, 
"little_parallelism")

http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
index a27ca29..f73d37b 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
@@ -21,11 +21,11 @@ package org.apache.flink.graph.asm.degree.annotate.directed;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
 import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeOrder;
@@ -118,7 +118,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
Degrees>> {
        public DataSet<Vertex<K, Degrees>> runInternal(Graph<K, VV, EV> input)
                        throws Exception {
                // s, t, bitmask
-               DataSet<Tuple3<K, K, ByteValue>> edgesWithOrder = 
input.getEdges()
+               DataSet<Tuple2<K, ByteValue>> vertexWithEdgeOrder = 
input.getEdges()
                        .flatMap(new EmitAndFlipEdge<K, EV>())
                                .setParallelism(parallelism)
                                .name("Emit and flip edge")
@@ -128,9 +128,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
Degrees>> {
                                .name("Reduce bitmask");
 
                // s, d(s)
-               DataSet<Vertex<K, Degrees>> vertexDegrees = edgesWithOrder
+               DataSet<Vertex<K, Degrees>> vertexDegrees = vertexWithEdgeOrder
                        .groupBy(0)
-                       .sortGroup(1, Order.ASCENDING)
                        .reduceGroup(new DegreeCount<K>())
                                .setParallelism(parallelism)
                                .name("Degree count");
@@ -178,22 +177,22 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, 
Vertex<K, Degrees>> {
         *
         * @param <T> ID type
         */
-       @ForwardedFields("0; 1")
+       @ForwardedFields("0")
        private static final class ReduceBitmask<T>
-       implements GroupReduceFunction<Tuple3<T, T, ByteValue>, Tuple3<T, T, 
ByteValue>> {
+       implements GroupReduceFunction<Tuple3<T, T, ByteValue>, Tuple2<T, 
ByteValue>> {
+               private Tuple2<T, ByteValue> output = new Tuple2<>(null, new 
ByteValue());
+
                @Override
-               public void reduce(Iterable<Tuple3<T, T, ByteValue>> values, 
Collector<Tuple3<T, T, ByteValue>> out)
+               public void reduce(Iterable<Tuple3<T, T, ByteValue>> values, 
Collector<Tuple2<T, ByteValue>> out)
                                throws Exception {
-                       Tuple3<T, T, ByteValue> output = null;
-
                        byte bitmask = 0;
 
                        for (Tuple3<T, T, ByteValue> value: values) {
-                               output = value;
+                               output.f0 = value.f0;
                                bitmask |= value.f2.getValue();
                        }
 
-                       output.f2.setValue(bitmask);
+                       output.f1.setValue(bitmask);
                        out.collect(output);
                }
        }
@@ -203,21 +202,22 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, 
Vertex<K, Degrees>> {
         *
         * @param <T> ID type
         */
+       @ForwardedFields("0")
        private static class DegreeCount<T>
-       implements GroupReduceFunction<Tuple3<T, T, ByteValue>, Vertex<T, 
Degrees>> {
+       implements GroupReduceFunction<Tuple2<T, ByteValue>, Vertex<T, 
Degrees>> {
                private Vertex<T, Degrees> output = new Vertex<>(null, new 
Degrees());
 
                @Override
-               public void reduce(Iterable<Tuple3<T, T, ByteValue>> values, 
Collector<Vertex<T, Degrees>> out)
+               public void reduce(Iterable<Tuple2<T, ByteValue>> values, 
Collector<Vertex<T, Degrees>> out)
                                throws Exception {
                        long degree = 0;
                        long outDegree = 0;
                        long inDegree = 0;
 
-                       for (Tuple3<T, T, ByteValue> edge : values) {
+                       for (Tuple2<T, ByteValue> edge : values) {
                                output.f0 = edge.f0;
 
-                               byte bitmask = edge.f2.getValue();
+                               byte bitmask = edge.f1.getValue();
 
                                degree++;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java
index 5bb2f4c..a7d6ef1 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java
@@ -25,13 +25,15 @@ import org.apache.flink.types.DoubleValue;
 
 class Functions {
 
+       private Functions() {}
+
        /**
         * Sum vertices' scores.
         *
         * @param <T> ID type
         */
        @ForwardedFields("0")
-       static class SumScore<T>
+       protected static final class SumScore<T>
                implements ReduceFunction<Tuple2<T, DoubleValue>> {
                @Override
                public Tuple2<T, DoubleValue> reduce(Tuple2<T, DoubleValue> 
left, Tuple2<T, DoubleValue> right)

http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
index 747735e..c5c4178 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
@@ -45,6 +45,7 @@ import org.apache.flink.graph.asm.result.UnaryResult;
 import org.apache.flink.graph.library.link_analysis.Functions.SumScore;
 import org.apache.flink.graph.library.link_analysis.PageRank.Result;
 import org.apache.flink.graph.utils.GraphUtils;
+import org.apache.flink.graph.utils.GraphUtils.IdentityMapper;
 import org.apache.flink.graph.utils.Murmur3_32;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.types.DoubleValue;
@@ -175,6 +176,10 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, 
Result<K>> {
                        .run(new VertexDegrees<K, VV, EV>()
                                .setParallelism(parallelism));
 
+               // prevent Exception "The dam has been closed." in TempBarrier
+               // for a simplified Graph as in PageRankITCase (see FLINK-5623)
+               vertexDegree = vertexDegree.map(new IdentityMapper<Vertex<K, 
Degrees>>());
+
                // vertex count
                DataSet<LongValue> vertexCount = GraphUtils.count(vertexDegree);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
index 2f4516a..bc3cb86 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
@@ -78,7 +78,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
        private int maximumScoreNumerator = 1;
 
-       private int maximumScoreDenominator = 0;
+       private int maximumScoreDenominator = 1;
 
        private int littleParallelism = PARALLELISM_DEFAULT;
 
@@ -121,7 +121,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
        }
 
        /**
-        * Filter out Jaccard Index scores greater than or equal to the given 
maximum fraction.
+        * Filter out Jaccard Index scores greater than the given maximum 
fraction.
         *
         * @param numerator numerator of the maximum score
         * @param denominator denominator of the maximum score
@@ -253,6 +253,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
         * number of groups and {@link GenerateGroups} emits each edge into 
each group.
         *
         * @param <T> ID type
+        * @param <ET> edge value type
         */
        @ForwardedFields("0->1; 1->2")
        private static class GenerateGroupSpans<T, ET>
@@ -439,7 +440,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
 
                        if (unboundedScores ||
                                        (count * minimumScoreDenominator >= 
distinctNeighbors * minimumScoreNumerator
-                                               && count * 
maximumScoreDenominator < distinctNeighbors * maximumScoreNumerator)) {
+                                               && count * 
maximumScoreDenominator <= distinctNeighbors * maximumScoreNumerator)) {
                                output.f0 = edge.f0;
                                output.f1 = edge.f1;
                                output.f2.setValue(count);

http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java
index ee9b770..4ba8e39 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java
@@ -30,6 +30,7 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.types.Value;
+import org.apache.flink.util.Preconditions;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -54,7 +55,7 @@ public class ValueArrayTypeInfo<T> extends 
TypeInformation<ValueArray<T>> implem
 
        public ValueArrayTypeInfo(TypeInformation<T> valueType) {
                this.valueType = valueType;
-               this.type = valueType.getTypeClass();
+               this.type = valueType == null ? null : valueType.getTypeClass();
        }
 
        @Override
@@ -85,12 +86,16 @@ public class ValueArrayTypeInfo<T> extends 
TypeInformation<ValueArray<T>> implem
 
        @Override
        public boolean isKeyType() {
+               Preconditions.checkNotNull(type, "TypeInformation type class is 
required");
+
                return Comparable.class.isAssignableFrom(type);
        }
 
        @Override
        @SuppressWarnings("unchecked")
        public TypeSerializer<ValueArray<T>> createSerializer(ExecutionConfig 
executionConfig) {
+               Preconditions.checkNotNull(type, "TypeInformation type class is 
required");
+
                if (IntValue.class.isAssignableFrom(type)) {
                        return (TypeSerializer<ValueArray<T>>) 
(TypeSerializer<?>) new IntValueArraySerializer();
                } else if (LongValue.class.isAssignableFrom(type)) {
@@ -107,6 +112,8 @@ public class ValueArrayTypeInfo<T> extends 
TypeInformation<ValueArray<T>> implem
        @SuppressWarnings({ "unchecked", "rawtypes" })
        @Override
        public TypeComparator<ValueArray<T>> createComparator(boolean 
sortOrderAscending, ExecutionConfig executionConfig) {
+               Preconditions.checkNotNull(type, "TypeInformation type class is 
required");
+
                if (IntValue.class.isAssignableFrom(type)) {
                        return (TypeComparator<ValueArray<T>>) 
(TypeComparator<?>) new IntValueArrayComparator(sortOrderAscending);
                } else if (LongValue.class.isAssignableFrom(type)) {
@@ -131,6 +138,8 @@ public class ValueArrayTypeInfo<T> extends 
TypeInformation<ValueArray<T>> implem
 
        @Override
        public int hashCode() {
+               Preconditions.checkNotNull(type, "TypeInformation type class is 
required");
+
                return type.hashCode();
        }
 
@@ -154,6 +163,8 @@ public class ValueArrayTypeInfo<T> extends 
TypeInformation<ValueArray<T>> implem
 
        @Override
        public String toString() {
+               Preconditions.checkNotNull(type, "TypeInformation type class is 
required");
+
                return "ValueArrayType<" + type.getSimpleName() + ">";
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoFactory.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoFactory.java
index 2145c3d..1cfb415 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoFactory.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoFactory.java
@@ -36,6 +36,9 @@ public class ValueArrayTypeInfoFactory<T> extends 
TypeInfoFactory<ValueArray<T>>
 
        @Override
        public TypeInformation<ValueArray<T>> createTypeInfo(Type t, 
Map<String, TypeInformation<?>> genericParameters) {
-               return new ValueArrayTypeInfo(genericParameters.get("T"));
+               @SuppressWarnings("unchecked")
+               TypeInformation<ValueArray<T>> typeInfo = new 
ValueArrayTypeInfo(genericParameters.get("T"));
+
+               return typeInfo;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
index 78fb378..5292751 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
@@ -22,8 +22,10 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.graph.asm.translate.TranslateFunction;
 import org.apache.flink.types.LongValue;
 
 import static 
org.apache.flink.api.java.typeutils.ValueTypeInfo.LONG_VALUE_TYPE_INFO;
@@ -51,6 +53,7 @@ public class GraphUtils {
         *
         * @param <T> element type
         */
+       @ForwardedFields("*")
        public static final class IdentityMapper<T>
        implements MapFunction<T, T> {
                public T map(T value) {
@@ -59,13 +62,27 @@ public class GraphUtils {
        }
 
        /**
+        * The identity mapper returns the input as output.
+        *
+        * This does not forward fields and is used to break an operator chain.
+        *
+        * @param <T> element type
+        */
+       public static final class NonForwardingIdentityMapper<T>
+       implements MapFunction<T, T> {
+               public T map(T value) {
+                       return value;
+               }
+       }
+
+       /**
         * Map each element to a value.
         *
         * @param <I> input type
         * @param <O> output type
         */
        public static class MapTo<I, O>
-       implements MapFunction<I, O>, ResultTypeQueryable<O> {
+       implements MapFunction<I, O>, ResultTypeQueryable<O>, 
TranslateFunction<I, O> {
                private final O value;
 
                /**
@@ -78,7 +95,13 @@ public class GraphUtils {
                }
 
                @Override
-               public O map(I o) throws Exception {
+               public O map(I input) throws Exception {
+                       return value;
+               }
+
+               @Override
+               public O translate(I input, O reuse)
+                               throws Exception {
                        return value;
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/59cf7032/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
index 128ee70..2443359 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
@@ -81,6 +81,8 @@ extends AsmTestBase {
                String expectedResult =
                        "(0,1,1,4)\n" +
                        "(0,2,1,4)\n" +
+                       "(0,3,2,4)\n" +
+                       "(1,2,2,4)\n" +
                        "(1,3,1,6)\n" +
                        "(1,4,1,3)\n" +
                        "(1,5,1,3)\n" +

Reply via email to