Repository: flink
Updated Branches:
  refs/heads/master 9ab494a73 -> 365cd987c


[FLINK-4646] [gelly] Add BipartiateGraph

This closes #2564


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/365cd987
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/365cd987
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/365cd987

Branch: refs/heads/master
Commit: 365cd987cc90fa9b399acbb4fe0af3f995f604e3
Parents: 9ab494a
Author: Ivan Mushketyk <[email protected]>
Authored: Tue Sep 27 23:14:09 2016 +0100
Committer: Greg Hogan <[email protected]>
Committed: Fri Dec 9 13:58:50 2016 -0500

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Edge.java  |  12 +-
 .../flink/graph/bipartite/BipartiteEdge.java    |  68 ++++
 .../flink/graph/bipartite/BipartiteGraph.java   | 319 +++++++++++++++++++
 .../flink/graph/bipartite/Projection.java       |  76 +++++
 .../graph/bipartite/BipartiteEdgeTest.java      |  70 ++++
 .../graph/bipartite/BipartiteGraphTest.java     | 146 +++++++++
 .../flink/graph/bipartite/ProjectionTest.java   |  76 +++++
 .../apache/flink/graph/generator/TestUtils.java |  11 +-
 .../apache/flink/test/util/TestBaseUtils.java   |  74 ++---
 9 files changed, 807 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
index 2bcce29..8e5f916 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
@@ -34,10 +34,10 @@ public class Edge<K, V> extends Tuple3<K, K, V>{
 
        public Edge(){}
 
-       public Edge(K src, K trg, V val) {
-               this.f0 = src;
-               this.f1 = trg;
-               this.f2 = val;
+       public Edge(K source, K target, V value) {
+               this.f0 = source;
+               this.f1 = target;
+               this.f2 = value;
        }
 
        /**
@@ -49,8 +49,8 @@ public class Edge<K, V> extends Tuple3<K, K, V>{
                        return new Edge<>(this.f1, this.f0, this.f2);
        }
 
-       public void setSource(K src) {
-               this.f0 = src;
+       public void setSource(K source) {
+               this.f0 = source;
        }
 
        public K getSource() {

http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteEdge.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteEdge.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteEdge.java
new file mode 100644
index 0000000..167e4ec
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteEdge.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.bipartite;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+
+/**
+ * A BipartiteEdge represents a link between top and bottom vertices
+ * in a {@link BipartiteGraph}. It is generalized form of {@link Edge}
+ * where the source and target vertex IDs can be of different types.
+ *
+ * @param <KT> the key type of the top vertices
+ * @param <KB> the key type of the bottom vertices
+ * @param <EV> the edge value type
+ */
+public class BipartiteEdge<KT, KB, EV> extends Tuple3<KT, KB, EV> {
+
+       private static final long serialVersionUID = 1L;
+
+       public BipartiteEdge() {}
+
+       public BipartiteEdge(KT topId, KB bottomId, EV value) {
+               this.f0 = topId;
+               this.f1 = bottomId;
+               this.f2 = value;
+       }
+
+       public KT getTopId() {
+               return this.f0;
+       }
+
+       public void setTopId(KT topId) {
+               this.f0 = topId;
+       }
+
+       public KB getBottomId() {
+               return this.f1;
+       }
+
+       public void setBottomId(KB bottomId) {
+               this.f1 = bottomId;
+       }
+
+       public EV getValue() {
+               return this.f2;
+       }
+
+       public void setValue(EV value) {
+               this.f2 = value;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java
new file mode 100644
index 0000000..b325103
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.bipartite;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.util.Collector;
+
+/**
+ * The vertices of a bipartite graph are divided into two disjoint sets, 
referenced by the names "top" and "bottom".
+ * Top and bottom vertices with the same key value represent distinct entities 
and must be specially handled
+ * when projecting to a simple {@link Graph}. Edges can only exist between a 
pair of vertices from different vertices
+ * sets. E.g. there can be no vertices between a pair of top vertices.
+ *
+ * <p>Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by a 
particular author.
+ *
+ * <p>Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} instance. 
This can be achieved by using
+ * the projection methods.
+ *
+ * @param <KT> the key type of top vertices
+ * @param <KB> the key type of bottom vertices
+ * @param <VVT> the vertex value type of top vertices
+ * @param <VVB> the vertex value type of bottom vertices
+ * @param <EV> the edge value type
+ */
+public class BipartiteGraph<KT, KB, VVT, VVB, EV> {
+
+       private final ExecutionEnvironment context;
+       private final DataSet<Vertex<KT, VVT>> topVertices;
+       private final DataSet<Vertex<KB, VVB>> bottomVertices;
+       private final DataSet<BipartiteEdge<KT, KB, EV>> edges;
+
+       private BipartiteGraph(
+                       DataSet<Vertex<KT, VVT>> topVertices,
+                       DataSet<Vertex<KB, VVB>> bottomVertices,
+                       DataSet<BipartiteEdge<KT, KB, EV>> edges,
+                       ExecutionEnvironment context) {
+               this.topVertices = topVertices;
+               this.bottomVertices = bottomVertices;
+               this.edges = edges;
+               this.context = context;
+       }
+
+       /**
+        * Create bipartite graph from datasets.
+        *
+        * @param topVertices dataset of top vertices in the graph
+        * @param bottomVertices dataset of bottom vertices in the graph
+        * @param edges dataset of edges between vertices
+        * @param context Flink execution context
+        * @return new bipartite graph created from provided datasets
+        */
+       public static <KT, KB, VVT, VVB, EV> BipartiteGraph<KT, KB, VVT, VVB, 
EV> fromDataSet(
+                       DataSet<Vertex<KT, VVT>> topVertices,
+                       DataSet<Vertex<KB, VVB>> bottomVertices,
+                       DataSet<BipartiteEdge<KT, KB, EV>> edges,
+                       ExecutionEnvironment context) {
+               return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+       }
+
+       /**
+        * Get dataset with top vertices.
+        *
+        * @return dataset with top vertices
+        */
+       public DataSet<Vertex<KT, VVT>> getTopVertices() {
+               return topVertices;
+       }
+
+       /**
+        * Get dataset with bottom vertices.
+        *
+        * @return dataset with bottom vertices
+        */
+       public DataSet<Vertex<KB, VVB>> getBottomVertices() {
+               return bottomVertices;
+       }
+
+       /**
+        * Get dataset with graph edges.
+        *
+        * @return dataset with graph edges
+        */
+       public DataSet<BipartiteEdge<KT, KB, EV>> getEdges() {
+               return edges;
+       }
+
+       /**
+        * Convert a bipartite graph into an undirected graph that contains 
only top vertices. An edge between two vertices
+        * in the new graph will exist only if the original bipartite graph 
contains a bottom vertex they are both
+        * connected to.
+        *
+        * The simple projection performs a single join and returns edges 
containing the bipartite edge values.
+        *
+        * Note: KT must override .equals(). This requirement may be removed in 
a future release.
+        *
+        * @return simple top projection of the bipartite graph
+        */
+       public Graph<KT, VVT, Tuple2<EV, EV>> projectionTopSimple() {
+               DataSet<Edge<KT, Tuple2<EV, EV>>> newEdges = edges.join(edges)
+                       .where(1)
+                       .equalTo(1)
+                       .with(new ProjectionTopSimple<KT, KB, EV>())
+                               .name("Simple top projection");
+
+               return Graph.fromDataSet(topVertices, newEdges, context);
+       }
+
+       @ForwardedFieldsFirst("0; 2->2.0")
+       @ForwardedFieldsSecond("0->1; 2->2.1")
+       private static class ProjectionTopSimple<KT, KB, EV>
+       implements FlatJoinFunction<BipartiteEdge<KT, KB, EV>, 
BipartiteEdge<KT, KB, EV>, Edge<KT, Tuple2<EV, EV>>> {
+               private Tuple2<EV, EV> edgeValues = new Tuple2<>();
+
+               private Edge<KT, Tuple2<EV, EV>> edge = new Edge<>(null, null, 
edgeValues);
+
+               @Override
+               public void join(BipartiteEdge<KT, KB, EV> first, 
BipartiteEdge<KT, KB, EV> second, Collector<Edge<KT, Tuple2<EV, EV>>> out)
+                               throws Exception {
+                       if (!first.f0.equals(second.f0)) {
+                               edge.f0 = first.f0;
+                               edge.f1 = second.f0;
+
+                               edgeValues.f0 = first.f2;
+                               edgeValues.f1 = second.f2;
+
+                               out.collect(edge);
+                       }
+               }
+       }
+
+       /**
+        * Convert a bipartite graph into an undirected graph that contains 
only bottom vertices. An edge between two
+        * vertices in the new graph will exist only if the original bipartite 
graph contains a top vertex they are both
+        * connected to.
+        *
+        * The simple projection performs a single join and returns edges 
containing the bipartite edge values.
+        *
+        * Note: KB must override .equals(). This requirement may be removed in 
a future release.
+        *
+        * @return simple bottom projection of the bipartite graph
+        */
+       public Graph<KB, VVB, Tuple2<EV, EV>> projectionBottomSimple() {
+               DataSet<Edge<KB, Tuple2<EV, EV>>> newEdges =  edges.join(edges)
+                       .where(0)
+                       .equalTo(0)
+                       .with(new ProjectionBottomSimple<KT, KB, EV>())
+                       .name("Simple bottom projection");
+
+               return Graph.fromDataSet(bottomVertices, newEdges, context);
+       }
+
+       @ForwardedFieldsFirst("1->0; 2->2.0")
+       @ForwardedFieldsSecond("1; 2->2.1")
+       private static class ProjectionBottomSimple<KT, KB, EV>
+       implements FlatJoinFunction<BipartiteEdge<KT, KB, EV>, 
BipartiteEdge<KT, KB, EV>, Edge<KB, Tuple2<EV, EV>>> {
+               private Tuple2<EV, EV> edgeValues = new Tuple2<>();
+
+               private Edge<KB, Tuple2<EV, EV>> edge = new Edge<>(null, null, 
edgeValues);
+
+               @Override
+               public void join(BipartiteEdge<KT, KB, EV> first, 
BipartiteEdge<KT, KB, EV> second, Collector<Edge<KB, Tuple2<EV, EV>>> out)
+                               throws Exception {
+                       if (!first.f1.equals(second.f1)) {
+                               edge.f0 = first.f1;
+                               edge.f1 = second.f1;
+
+                               edgeValues.f0 = first.f2;
+                               edgeValues.f1 = second.f2;
+
+                               out.collect(edge);
+                       }
+               }
+       }
+
+       /**
+        * Convert a bipartite graph into a graph that contains only top 
vertices. An edge between two vertices in the new
+        * graph will exist only if the original bipartite graph contains at 
least one bottom vertex they both connect to.
+        *
+        * The full projection performs three joins and returns edges 
containing the the connecting vertex ID and value,
+        * both top vertex values, and both bipartite edge values.
+        *
+        * Note: KT must override .equals(). This requirement may be removed in 
a future release.
+        *
+        * @return full top projection of the bipartite graph
+        */
+       public Graph<KT, VVT, Projection<KB, VVB, VVT, EV>> projectionTopFull() 
{
+               DataSet<Tuple5<KT, KB, EV, VVT, VVB>> edgesWithVertices = 
joinEdgeWithVertices();
+
+               DataSet<Edge<KT, Projection<KB, VVB, VVT, EV>>> newEdges = 
edgesWithVertices.join(edgesWithVertices)
+                       .where(1)
+                       .equalTo(1)
+                       .with(new ProjectionTopFull<KT, KB, EV, VVT, VVB>())
+                               .name("Full top projection");
+
+               return Graph.fromDataSet(topVertices, newEdges, context);
+       }
+
+       private DataSet<Tuple5<KT, KB, EV, VVT, VVB>> joinEdgeWithVertices() {
+               return edges
+                       .join(topVertices, JoinHint.REPARTITION_HASH_SECOND)
+                       .where(0)
+                       .equalTo(0)
+                       .projectFirst(0, 1, 2)
+                       .<Tuple4<KT, KB, EV, VVT>>projectSecond(1)
+                               .name("Edge with vertex")
+                       .join(bottomVertices, JoinHint.REPARTITION_HASH_SECOND)
+                       .where(1)
+                       .equalTo(0)
+                       .projectFirst(0, 1, 2, 3)
+                       .<Tuple5<KT, KB, EV, VVT, VVB>>projectSecond(1)
+                               .name("Edge with vertices");
+       }
+
+       @ForwardedFieldsFirst("0; 1->2.0; 2->2.4; 3->2.2; 4->2.1")
+       @ForwardedFieldsSecond("0->1; 2->2.5; 3->2.3")
+       private static class ProjectionTopFull<KT, KB, EV, VVT, VVB>
+       implements FlatJoinFunction<Tuple5<KT, KB, EV, VVT, VVB>, Tuple5<KT, 
KB, EV, VVT, VVB>, Edge<KT, Projection<KB, VVB, VVT, EV>>> {
+               private Projection<KB, VVB, VVT, EV> projection = new 
Projection<>();
+
+               private Edge<KT, Projection<KB, VVB, VVT, EV>> edge = new 
Edge<>(null, null, projection);
+
+               @Override
+               public void join(Tuple5<KT, KB, EV, VVT, VVB> first, Tuple5<KT, 
KB, EV, VVT, VVB> second, Collector<Edge<KT, Projection<KB, VVB, VVT, EV>>> out)
+                               throws Exception {
+                       if (!first.f0.equals(second.f0)) {
+                               edge.f0 = first.f0;
+                               edge.f1 = second.f0;
+
+                               projection.f0 = first.f1;
+                               projection.f1 = first.f4;
+                               projection.f2 = first.f3;
+                               projection.f3 = second.f3;
+                               projection.f4 = first.f2;
+                               projection.f5 = second.f2;
+
+                               out.collect(edge);
+                       }
+               }
+       }
+
+       /**
+        * Convert a bipartite graph into a graph that contains only bottom 
vertices. An edge between two vertices in the
+        * new graph will exist only if the original bipartite graph contains 
at least one top vertex they both connect to.
+        *
+        * The full projection performs three joins and returns edges 
containing the the connecting vertex ID and value,
+        * both bottom vertex values, and both bipartite edge values.
+        *
+        * Note: KB must override .equals(). This requirement may be removed in 
a future release.
+        *
+        * @return full bottom projection of the bipartite graph
+        */
+       public Graph<KB, VVB, Projection<KT, VVT, VVB, EV>> 
projectionBottomFull() {
+               DataSet<Tuple5<KT, KB, EV, VVT, VVB>> edgesWithVertices = 
joinEdgeWithVertices();
+
+               DataSet<Edge<KB, Projection<KT, VVT, VVB, EV>>> newEdges = 
edgesWithVertices.join(edgesWithVertices)
+                       .where(0)
+                       .equalTo(0)
+                       .with(new ProjectionBottomFull<KT, KB, EV, VVT, VVB>())
+                               .name("Full bottom projection");
+
+               return Graph.fromDataSet(bottomVertices, newEdges, context);
+       }
+
+       @ForwardedFieldsFirst("1->0; 2->2.4; 3->2.1; 4->2.2")
+       @ForwardedFieldsSecond("1; 2->2.5; 4->2.3")
+       private static class ProjectionBottomFull<KT, KB, EV, VVT, VVB>
+       implements FlatJoinFunction<Tuple5<KT, KB, EV, VVT, VVB>, Tuple5<KT, 
KB, EV, VVT, VVB>, Edge<KB, Projection<KT, VVT, VVB, EV>>> {
+               private Projection<KT, VVT, VVB, EV> projection = new 
Projection<>();
+
+               private Edge<KB, Projection<KT, VVT, VVB, EV>> edge = new 
Edge<>(null, null, projection);
+
+               @Override
+               public void join(Tuple5<KT, KB, EV, VVT, VVB> first, Tuple5<KT, 
KB, EV, VVT, VVB> second, Collector<Edge<KB, Projection<KT, VVT, VVB, EV>>> out)
+                       throws Exception {
+                       if (!first.f1.equals(second.f1)) {
+                               edge.f0 = first.f1;
+                               edge.f1 = second.f1;
+
+                               projection.f0 = first.f0;
+                               projection.f1 = first.f3;
+                               projection.f2 = first.f4;
+                               projection.f3 = second.f4;
+                               projection.f4 = first.f2;
+                               projection.f5 = second.f2;
+
+                               out.collect(edge);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/Projection.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/Projection.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/Projection.java
new file mode 100644
index 0000000..95a9cf6
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/Projection.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.bipartite;
+
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.graph.Vertex;
+
+/**
+ * The edge value of a full bipartite projection contains:
+ * <ul>
+ *     <li>the ID and vertex value of the connecting vertex</li>
+ *     <li>the vertex value for the source and target vertex</li>
+ *     <li>both edge values from the bipartite edges</li>
+ * </ul>
+ *
+ * @param <KC> the key type of connecting vertices
+ * @param <VVC> the vertex value type of connecting vertices
+ * @param <VV> the vertex value type of top or bottom vertices
+ * @param <EV> the edge value type from bipartite edges
+  */
+public class Projection<KC, VVC, VV, EV> extends Tuple6<KC, VVC, VV, VV, EV, 
EV> {
+
+       public Projection() {}
+
+       public Projection(
+                       Vertex<KC, VVC> connectingVertex,
+                       VV sourceVertexValue, VV targetVertexValue,
+                       EV sourceEdgeValue, EV targetEdgeValue) {
+               this.f0 = connectingVertex.getId();
+               this.f1 = connectingVertex.getValue();
+               this.f2 = sourceVertexValue;
+               this.f3 = targetVertexValue;
+               this.f4 = sourceEdgeValue;
+               this.f5 = targetEdgeValue;
+       }
+
+       public KC getIntermediateVertexId() {
+               return this.f0;
+       }
+
+       public VVC getIntermediateVertexValue() {
+               return this.f1;
+       }
+
+       public VV getsSourceVertexValue() {
+               return this.f2;
+       }
+
+       public VV getTargetVertexValue() {
+               return this.f3;
+       }
+
+       public EV getSourceEdgeValue() {
+               return this.f4;
+       }
+
+       public EV getTargetEdgeValue() {
+               return this.f5;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteEdgeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteEdgeTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteEdgeTest.java
new file mode 100644
index 0000000..ad0106b
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteEdgeTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.bipartite;
+
+import org.apache.flink.graph.bipartite.BipartiteEdge;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class BipartiteEdgeTest {
+
+       private static final int BOTTOM_ID = 0;
+       private static final int TOP_ID = 1;
+       private static final String VALUE = "value";
+
+       private final BipartiteEdge<Integer, Integer, String> edge = 
createEdge();
+
+       @Test
+       public void testGetBottomId() {
+               assertEquals(BOTTOM_ID, (long) edge.getBottomId());
+       }
+
+       @Test
+       public void testGetTopId() {
+               assertEquals(TOP_ID, (long) edge.getTopId());
+       }
+
+       @Test
+       public void testGetValue() {
+               assertEquals(VALUE, edge.getValue());
+       }
+
+       @Test
+       public void testSetBottomId() {
+               edge.setBottomId(100);
+               assertEquals(100, (long) edge.getBottomId());
+       }
+
+       @Test
+       public void testSetTopId() {
+               edge.setTopId(100);
+               assertEquals(100, (long) edge.getTopId());
+       }
+
+       @Test
+       public void testSetValue() {
+               edge.setValue("newVal");
+               assertEquals("newVal", edge.getValue());
+       }
+
+       private BipartiteEdge<Integer, Integer, String> createEdge() {
+               return new BipartiteEdge<>(TOP_ID, BOTTOM_ID, VALUE);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteGraphTest.java
new file mode 100644
index 0000000..366cf8e
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteGraphTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.bipartite;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.apache.flink.graph.generator.TestUtils.compareGraph;
+import static org.junit.Assert.assertEquals;
+
+public class BipartiteGraphTest {
+
+       @Test
+       public void testGetTopVertices() throws Exception {
+               BipartiteGraph<Integer, Integer, String, String, String> 
bipartiteGraph = createBipartiteGraph();
+
+               assertEquals(
+                       Arrays.asList(
+                               new Vertex<>(4, "top4"),
+                               new Vertex<>(5, "top5"),
+                               new Vertex<>(6, "top6")),
+                       bipartiteGraph.getTopVertices().collect());
+       }
+
+       @Test
+       public void testGetBottomVertices() throws Exception {
+               BipartiteGraph<Integer, Integer, String, String, String> 
bipartiteGraph = createBipartiteGraph();
+
+               assertEquals(
+                       Arrays.asList(
+                               new Vertex<>(1, "bottom1"),
+                               new Vertex<>(2, "bottom2"),
+                               new Vertex<>(3, "bottom3")),
+                       bipartiteGraph.getBottomVertices().collect());
+       }
+
+       @Test
+       public void testSimpleTopProjection() throws Exception {
+               BipartiteGraph<Integer, Integer, String, String, String> 
bipartiteGraph = createBipartiteGraph();
+               Graph<Integer, String, Tuple2<String, String>> graph = 
bipartiteGraph.projectionTopSimple();
+
+               compareGraph(graph, "4; 5; 6", "5,4; 4,5; 5,6; 6,5");
+
+               String expected =
+                       "(5,4,(5-1,4-1))\n" +
+                       "(4,5,(4-1,5-1))\n" +
+                       "(6,5,(6-2,5-2))\n" +
+                       "(5,6,(5-2,6-2))";
+               TestBaseUtils.compareResultAsText(graph.getEdges().collect(), 
expected);
+       }
+
+       @Test
+       public void testSimpleBottomProjection() throws Exception {
+               BipartiteGraph<Integer, Integer, String, String, String> 
bipartiteGraph = createBipartiteGraph();
+               Graph<Integer, String, Tuple2<String, String>> graph = 
bipartiteGraph.projectionBottomSimple();
+
+               compareGraph(graph, "1; 2; 3", "1,2; 2,1; 2,3; 3,2");
+
+               String expected =
+                       "(3,2,(6-3,6-2))\n" +
+                       "(2,3,(6-2,6-3))\n" +
+                       "(2,1,(5-2,5-1))\n" +
+                       "(1,2,(5-1,5-2))";
+               TestBaseUtils.compareResultAsText(graph.getEdges().collect(), 
expected);
+       }
+
+       @Test
+       public void testFullTopProjection() throws Exception {
+               BipartiteGraph<Integer, Integer, String, String, String> 
bipartiteGraph = createBipartiteGraph();
+               Graph<Integer, String, Projection<Integer, String, String, 
String>> graph = bipartiteGraph.projectionTopFull();
+
+               graph.getEdges().print();
+               compareGraph(graph, "4; 5; 6", "5,4; 4,5; 5,6; 6,5");
+
+               String expected =
+                       "(5,4,(1,bottom1,top5,top4,5-1,4-1))\n" +
+                       "(4,5,(1,bottom1,top4,top5,4-1,5-1))\n" +
+                       "(6,5,(2,bottom2,top6,top5,6-2,5-2))\n" +
+                       "(5,6,(2,bottom2,top5,top6,5-2,6-2))";
+               TestBaseUtils.compareResultAsText(graph.getEdges().collect(), 
expected);
+       }
+
+       @Test
+       public void testFullBottomProjection() throws Exception {
+               BipartiteGraph<Integer, Integer, String, String, String> 
bipartiteGraph = createBipartiteGraph();
+               Graph<Integer, String, Projection<Integer, String, String, 
String>> graph = bipartiteGraph.projectionBottomFull();
+
+               compareGraph(graph, "1; 2; 3", "1,2; 2,1; 2,3; 3,2");
+
+               String expected =
+                       "(3,2,(6,top6,bottom3,bottom2,6-3,6-2))\n" +
+                       "(2,3,(6,top6,bottom2,bottom3,6-2,6-3))\n" +
+                       "(2,1,(5,top5,bottom2,bottom1,5-2,5-1))\n" +
+                       "(1,2,(5,top5,bottom1,bottom2,5-1,5-2))";
+               TestBaseUtils.compareResultAsText(graph.getEdges().collect(), 
expected);
+       }
+
+       private BipartiteGraph<Integer, Integer, String, String, String> 
createBipartiteGraph() {
+               ExecutionEnvironment executionEnvironment = 
ExecutionEnvironment.createCollectionsEnvironment();
+
+               DataSet<Vertex<Integer, String>> topVertices = 
executionEnvironment.fromCollection(Arrays.asList(
+                       new Vertex<>(4, "top4"),
+                       new Vertex<>(5, "top5"),
+                       new Vertex<>(6, "top6")
+               ));
+
+               DataSet<Vertex<Integer, String>> bottomVertices = 
executionEnvironment.fromCollection(Arrays.asList(
+                       new Vertex<>(1, "bottom1"),
+                       new Vertex<>(2, "bottom2"),
+                       new Vertex<>(3, "bottom3")
+               ));
+
+               DataSet<BipartiteEdge<Integer, Integer, String>> edges = 
executionEnvironment.fromCollection(Arrays.asList(
+                       new BipartiteEdge<>(4, 1, "4-1"),
+                       new BipartiteEdge<>(5, 1, "5-1"),
+                       new BipartiteEdge<>(5, 2, "5-2"),
+                       new BipartiteEdge<>(6, 2, "6-2"),
+                       new BipartiteEdge<>(6, 3, "6-3")
+               ));
+
+               return BipartiteGraph.fromDataSet(topVertices, bottomVertices, 
edges, executionEnvironment);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/ProjectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/ProjectionTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/ProjectionTest.java
new file mode 100644
index 0000000..3aafe64
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/ProjectionTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.bipartite;
+
+import org.apache.flink.graph.Vertex;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ProjectionTest {
+
+       private static final int ID = 10;
+
+       private static final String VERTEX_VALUE = "vertex-value";
+       private static final String SOURCE_EDGE_VALUE = "source-edge-value";
+       private static final String TARGET_EDGE_VALUE = "target-edge-value";
+       private static final String SOURCE_VERTEX_VALUE = "source-vertex-value";
+       private static final String TARGET_VERTEX_VALUE = "target-vertex-value";
+
+       private Projection<Integer, String, String, String> projection = 
createProjection();
+
+       @Test
+       public void testIntermediateVertexGetId() {
+               assertEquals(Integer.valueOf(ID), 
projection.getIntermediateVertexId());
+       }
+
+       @Test
+       public void testGetIntermediateVertexValue() {
+               assertEquals(VERTEX_VALUE, 
projection.getIntermediateVertexValue());
+       }
+
+       @Test
+       public void testGetSourceEdgeValue() {
+               assertEquals(SOURCE_EDGE_VALUE, 
projection.getSourceEdgeValue());
+       }
+
+       @Test
+       public void testGetTargetEdgeValue() {
+               assertEquals(TARGET_EDGE_VALUE, 
projection.getTargetEdgeValue());
+       }
+
+       @Test
+       public void testGetSourceVertexValue() {
+               assertEquals(SOURCE_VERTEX_VALUE, 
projection.getsSourceVertexValue());
+       }
+
+       @Test
+       public void testGetTargetVertexValue() {
+               assertEquals(TARGET_VERTEX_VALUE, 
projection.getTargetVertexValue());
+       }
+
+       private Projection<Integer, String, String, String> createProjection() {
+               return new Projection<>(
+                       new Vertex<>(ID, VERTEX_VALUE),
+                       SOURCE_VERTEX_VALUE,
+                       TARGET_VERTEX_VALUE,
+                       SOURCE_EDGE_VALUE,
+                       TARGET_EDGE_VALUE);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
index 3ea5a44..a302a30 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
@@ -50,7 +50,12 @@ public final class TestUtils {
         */
        public static <K,VV,EV> void compareGraph(Graph<K,VV,EV> graph, String 
expectedVertices, String expectedEdges)
                        throws Exception {
-               // Vertices
+               compareVertices(graph, expectedVertices);
+               compareEdges(graph, expectedEdges);
+       }
+
+       private static <K, VV, EV> void compareVertices(Graph<K, VV, EV> graph, 
String expectedVertices)
+                       throws Exception {
                if (expectedVertices != null) {
                        List<String> resultVertices = new ArrayList<>();
 
@@ -60,8 +65,10 @@ public final class TestUtils {
 
                        TestBaseUtils.compareResultAsText(resultVertices, 
expectedVertices.replaceAll("\\s","").replace(";", "\n"));
                }
+       }
 
-               // Edges
+       private static <K, VV, EV> void compareEdges(Graph<K, VV, EV> graph, 
String expectedEdges)
+                       throws Exception {
                if (expectedEdges != null) {
                        List<String> resultEdges = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 804b3d4..5e15076 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -18,17 +18,12 @@
 
 package org.apache.flink.test.util;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 import akka.actor.ActorRef;
 import akka.dispatch.Futures;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
-
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -37,14 +32,10 @@ import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
-
 import org.apache.hadoop.fs.FileSystem;
-
 import org.junit.Assert;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.concurrent.Await;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.ExecutionContext$;
@@ -77,6 +68,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 public class TestBaseUtils extends TestLogger {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(TestBaseUtils.class);
@@ -92,32 +86,32 @@ public class TestBaseUtils extends TestLogger {
        public static FiniteDuration DEFAULT_TIMEOUT = new 
FiniteDuration(DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS);
 
        // 
------------------------------------------------------------------------
-       
+
        protected static File logDir;
 
        protected TestBaseUtils(){
                verifyJvmOptions();
        }
-       
+
        private static void verifyJvmOptions() {
                long heap = Runtime.getRuntime().maxMemory() >> 20;
                Assert.assertTrue("Insufficient java heap space " + heap + "mb 
- set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB
                                + "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
        }
-       
-       
+
+
        public static LocalFlinkMiniCluster startCluster(
                int numTaskManagers,
                int taskManagerNumSlots,
                boolean startWebserver,
                boolean startZooKeeper,
                boolean singleActorSystem) throws Exception {
-               
+
                Configuration config = new Configuration();
 
                config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
numTaskManagers);
                config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
taskManagerNumSlots);
-               
+
                config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, 
startWebserver);
 
                if (startZooKeeper) {
@@ -146,7 +140,7 @@ public class TestBaseUtils extends TestLogger {
 
                config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 
8081);
                config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logFile.toString());
-               
+
                config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, 
logFile.toString());
 
                LocalFlinkMiniCluster cluster =  new 
LocalFlinkMiniCluster(config, singleActorSystem);
@@ -164,7 +158,7 @@ public class TestBaseUtils extends TestLogger {
                if (executor != null) {
                        int numUnreleasedBCVars = 0;
                        int numActiveConnections = 0;
-                       
+
                        if (executor.running()) {
                                List<ActorRef> tms = 
executor.getTaskManagersAsJava();
                                List<Future<Object>> 
bcVariableManagerResponseFutures = new ArrayList<>();
@@ -249,7 +243,7 @@ public class TestBaseUtils extends TestLogger {
                }
                return readers;
        }
-       
+
        public static BufferedInputStream[] getResultInputStream(String 
resultPath) throws IOException {
                return getResultInputStream(resultPath, new String[]{});
        }
@@ -268,13 +262,13 @@ public class TestBaseUtils extends TestLogger {
                readAllResultLines(target, resultPath, new String[]{});
        }
 
-       public static void readAllResultLines(List<String> target, String 
resultPath, String[] excludePrefixes) 
+       public static void readAllResultLines(List<String> target, String 
resultPath, String[] excludePrefixes)
                        throws IOException {
-               
+
                readAllResultLines(target, resultPath, excludePrefixes, false);
        }
 
-       public static void readAllResultLines(List<String> target, String 
resultPath, 
+       public static void readAllResultLines(List<String> target, String 
resultPath,
                                                                                
        String[] excludePrefixes, boolean inOrderOfFiles) throws IOException {
 
                final BufferedReader[] readers = getResultReader(resultPath, 
excludePrefixes, inOrderOfFiles);
@@ -453,14 +447,14 @@ public class TestBaseUtils extends TestLogger {
        public static <T> void compareOrderedResultAsText(List<T> result, 
String expected, boolean asTuples) {
                compareResult(result, expected, asTuples, false);
        }
-       
+
        private static <T> void compareResult(List<T> result, String expected, 
boolean asTuples, boolean sort) {
                String[] expectedStrings = expected.split("\n");
                String[] resultStrings = new String[result.size()];
-               
+
                for (int i = 0; i < resultStrings.length; i++) {
                        T val = result.get(i);
-                       
+
                        if (asTuples) {
                                if (val instanceof Tuple) {
                                        Tuple t = (Tuple) val;
@@ -480,19 +474,25 @@ public class TestBaseUtils extends TestLogger {
                                resultStrings[i] = (val == null) ? "null" : 
val.toString();
                        }
                }
-               
-               assertEquals("Wrong number of elements result", 
expectedStrings.length, resultStrings.length);
 
                if (sort) {
                        Arrays.sort(expectedStrings);
                        Arrays.sort(resultStrings);
                }
-               
+
+               // Include content of both arrays to provide more context in 
case of a test failure
+               String msg = String.format(
+                       "Different elements in arrays: expected %d elements and 
received %d\n expected: %s\n received: %s",
+                       expectedStrings.length, resultStrings.length,
+                       Arrays.toString(expectedStrings), 
Arrays.toString(resultStrings));
+
+               assertEquals(msg, expectedStrings.length, resultStrings.length);
+
                for (int i = 0; i < expectedStrings.length; i++) {
-                       assertEquals(expectedStrings[i], resultStrings[i]);
+                       assertEquals(msg, expectedStrings[i], resultStrings[i]);
                }
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
        // Comparison methods for tests using sample
        // 
--------------------------------------------------------------------------------------------
@@ -523,7 +523,7 @@ public class TestBaseUtils extends TestLogger {
        // 
--------------------------------------------------------------------------------------------
        //  Miscellaneous helper methods
        // 
--------------------------------------------------------------------------------------------
-       
+
        protected static Collection<Object[]> toParameterList(Configuration ... 
testConfigs) {
                ArrayList<Object[]> configs = new ArrayList<>();
                for (Configuration testConfig : testConfigs) {
@@ -560,7 +560,7 @@ public class TestBaseUtils extends TestLogger {
                        System.err.println("Failed to delete file " + 
f.getAbsolutePath());
                }
        }
-       
+
        public static String constructTestPath(Class<?> forClass, String 
folder) {
                // we create test path that depends on class to prevent name 
clashes when two tests
                // create temp files with the same name
@@ -571,7 +571,7 @@ public class TestBaseUtils extends TestLogger {
                path += (forClass.getName() + "-" + folder);
                return path;
        }
-       
+
        public static String constructTestURI(Class<?> forClass, String folder) 
{
                return new File(constructTestPath(forClass, 
folder)).toURI().toString();
        }
@@ -597,7 +597,7 @@ public class TestBaseUtils extends TestLogger {
 
                return IOUtils.toString(is, connection.getContentEncoding() != 
null ? connection.getContentEncoding() : "UTF-8");
        }
-       
+
        public static class TupleComparator<T extends Tuple> implements 
Comparator<T> {
 
                @Override
@@ -612,7 +612,7 @@ public class TestBaseUtils extends TestLogger {
                                for (int i = 0; i < o1.getArity(); i++) {
                                        Object val1 = o1.getField(i);
                                        Object val2 = o2.getField(i);
-                                       
+
                                        int cmp;
                                        if (val1 != null && val2 != null) {
                                                cmp = compareValues(val1, val2);
@@ -620,16 +620,16 @@ public class TestBaseUtils extends TestLogger {
                                        else {
                                                cmp = val1 == null ? (val2 == 
null ? 0 : -1) : 1;
                                        }
-                                       
+
                                        if (cmp != 0) {
                                                return cmp;
                                        }
                                }
-                               
+
                                return 0;
                        }
                }
-               
+
                @SuppressWarnings("unchecked")
                private static <X extends Comparable<X>> int 
compareValues(Object o1, Object o2) {
                        if (o1 instanceof Comparable && o2 instanceof 
Comparable) {

Reply via email to