Repository: flink
Updated Branches:
  refs/heads/master 3dc7423c4 -> daa357aca


[FLINK-2411] [gelly] Add Summarization Algorithm

* implemented algorithm
* implemented integration tests
* updated gelly guide

This closes #1269


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/daa357ac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/daa357ac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/daa357ac

Branch: refs/heads/master
Commit: daa357aca1fe7745eb7809ac384c5fdc14bd74db
Parents: 3dc7423
Author: Martin Junghanns <[email protected]>
Authored: Tue Oct 20 09:28:40 2015 +0200
Committer: vasia <[email protected]>
Committed: Mon Oct 26 11:12:03 2015 +0100

----------------------------------------------------------------------
 docs/libs/gelly_guide.md                        |  25 +
 .../main/java/org/apache/flink/graph/Graph.java |   6 +-
 .../graph/example/utils/SummarizationData.java  | 134 +++++
 .../flink/graph/library/Summarization.java      | 521 +++++++++++++++++++
 .../graph/test/library/SummarizationITCase.java | 189 +++++++
 5 files changed, 873 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/daa357ac/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index 646ec7f..59e1a3b 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -1437,6 +1437,7 @@ Gelly has a growing collection of graph algorithms for 
easily analyzing large-sc
 * [GSA Single Source Shortest Paths](#gsa-single-source-shortest-paths)
 * [GSA Triangle Count](#gsa-triangle-count)
 * [Triangle Enumerator](#triangle-enumerator)
+* [Summarization](#summarization)
 
 Gelly's library methods can be used by simply calling the `run()` method on 
the input graph:
 
@@ -1617,4 +1618,28 @@ This implementation extends the basic algorithm by 
computing output degrees of e
 The algorithm takes a directed graph as input and outputs a `DataSet` of 
`Tuple3`. The Vertex ID type has to be `Comparable`.
 Each `Tuple3` corresponds to a triangle, with the fields containing the IDs of 
the vertices forming the triangle.
 
+### Summarization
+
+#### Overview
+The summarization algorithm computes a condensed version of the input graph by 
grouping vertices and edges based on 
+their values. In doing so, the algorithm helps to uncover insights about 
patterns and distributions in the graph.
+One possible use case is the visualization of communities where the whole 
graph is too large and needs to be summarized
+based on the community identifier stored at a vertex.
+
+#### Details
+In the resulting graph, each vertex represents a group of vertices that share 
the same value. An edge, that connects a 
+vertex with itself, represents all edges with the same edge value that connect 
vertices from the same vertex group. An 
+edge between different vertices in the output graph represents all edges with 
the same edge value between members of 
+different vertex groups in the input graph.
+
+The algorithm is implemented using Flink data operators. First, vertices are 
grouped by their value and a representative
+is chosen from each group. For any edge, the source and target vertex 
identifiers are replaced with the corresponding 
+representative and grouped by source, target and edge value. Output vertices 
and edges are created from their
+corresponding groupings.
+
+#### Usage
+The algorithm takes a directed, vertex (and possibly edge) attributed graph as 
input and outputs a new graph where each
+vertex represents a group of vertices and each edge represents a group of 
edges from the input graph. Furthermore, each 
+vertex and edge in the output graph stores the common group value and the 
number of represented elements.
+
 [Back to top](#top)

http://git-wip-us.apache.org/repos/asf/flink/blob/daa357ac/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 6015be4..68bfb5b 100755
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -1370,7 +1370,7 @@ public class Graph<K, VV, EV> {
                DataSet<Vertex<K, VV>> newVertices = 
getVertices().coGroup(verticesToBeRemoved).where(0).equalTo(0)
                                .with(new VerticesRemovalCoGroup<K, VV>());
 
-               DataSet < Edge < K, EV >> newEdges = 
newVertices.join(getEdges()).where(0).equalTo(0)
+               DataSet <Edge< K, EV>> newEdges = 
newVertices.join(getEdges()).where(0).equalTo(0)
                                // if the edge source was removed, the edge 
will also be removed
                                .with(new ProjectEdgeToBeRemoved<K, VV, EV>())
                                // if the edge target was removed, the edge 
will also be removed
@@ -1485,7 +1485,9 @@ public class Graph<K, VV, EV> {
 
        /**
         * Performs Difference on the vertex and edge sets of the input graphs
-        * removes common vertices and edges. If a source/target vertex is 
removed, its corresponding edge will also be removed
+        * removes common vertices and edges. If a source/target vertex is 
removed,
+        * its corresponding edge will also be removed
+        * 
         * @param graph the graph to perform difference with
         * @return a new graph where the common vertices and edges have been 
removed
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/daa357ac/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SummarizationData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SummarizationData.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SummarizationData.java
new file mode 100644
index 0000000..88f76cc
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SummarizationData.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import com.google.common.collect.Lists;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.NullValue;
+
+import java.util.List;
+
+/**
+ * Provides the default data set used for Summarization tests.
+ */
+public class SummarizationData {
+
+       private SummarizationData() {}
+
+       /**
+        * The resulting vertex id can be any id of the vertices summarized by 
the single vertex.
+        *
+        * Format:
+        *
+        * "possible-id[,possible-id];group-value,group-count"
+        */
+       public static final String[] EXPECTED_VERTICES = new String[] {
+                       "0,1;A,2",
+                       "2,3,4;B,3",
+                       "5;C,1"
+       };
+
+       /**
+        * Format:
+        *
+        * 
"possible-source-id[,possible-source-id];possible-target-id[,possible-target-id];group-value,group-count"
+        */
+       public static final String[] EXPECTED_EDGES_WITH_VALUES = new String[] {
+                       "0,1;0,1;A,2",
+                       "0,1;2,3,4;A,1",
+                       "2,3,4;0,1;A,1",
+                       "2,3,4;0,1;C,2",
+                       "2,3,4;2,3,4;B,2",
+                       "5;2,3,4;D,2"
+       };
+
+       /**
+        * Format:
+        *
+        * 
"possible-source-id[,possible-source-id];possible-target-id[,possible-target-id];group-value,group-count"
+        */
+       public static final String[] EXPECTED_EDGES_ABSENT_VALUES = new 
String[] {
+                       "0,1;0,1;(null),2",
+                       "0,1;2,3,4;(null),1",
+                       "2,3,4;0,1;(null),3",
+                       "2,3,4;2,3,4;(null),2",
+                       "5;2,3,4;(null),2"
+       };
+
+       /**
+        * Creates a set of vertices with attached {@link String} values.
+        *
+        * @param env execution environment
+        * @return vertex data set with string values
+        */
+       public static DataSet<Vertex<Long, String>> 
getVertices(ExecutionEnvironment env) {
+               List<Vertex<Long, String>> vertices = 
Lists.newArrayListWithExpectedSize(6);
+               vertices.add(new Vertex<>(0L, "A"));
+               vertices.add(new Vertex<>(1L, "A"));
+               vertices.add(new Vertex<>(2L, "B"));
+               vertices.add(new Vertex<>(3L, "B"));
+               vertices.add(new Vertex<>(4L, "B"));
+               vertices.add(new Vertex<>(5L, "C"));
+
+               return env.fromCollection(vertices);
+       }
+
+       /**
+        * Creates a set of edges with attached {@link String} values.
+        *
+        * @param env execution environment
+        * @return edge data set with string values
+        */
+       public static DataSet<Edge<Long, String>> getEdges(ExecutionEnvironment 
env) {
+               List<Edge<Long, String>> edges = 
Lists.newArrayListWithExpectedSize(10);
+               edges.add(new Edge<>(0L, 1L, "A"));
+               edges.add(new Edge<>(1L, 0L, "A"));
+               edges.add(new Edge<>(1L, 2L, "A"));
+               edges.add(new Edge<>(2L, 1L, "A"));
+               edges.add(new Edge<>(2L, 3L, "B"));
+               edges.add(new Edge<>(3L, 2L, "B"));
+               edges.add(new Edge<>(4L, 0L, "C"));
+               edges.add(new Edge<>(4L, 1L, "C"));
+               edges.add(new Edge<>(5L, 2L, "D"));
+               edges.add(new Edge<>(5L, 3L, "D"));
+
+               return env.fromCollection(edges);
+       }
+
+       /**
+        * Creates a set of edges with {@link NullValue} as edge value.
+        *
+        * @param env execution environment
+        * @return edge data set with null values
+        */
+       @SuppressWarnings("serial")
+       public static DataSet<Edge<Long, NullValue>> 
getEdgesWithAbsentValues(ExecutionEnvironment env) {
+               return getEdges(env).map(new MapFunction<Edge<Long, String>, 
Edge<Long, NullValue>>() {
+                       @Override
+                       public Edge<Long, NullValue> map(Edge<Long, String> 
value) throws Exception {
+                               return new Edge<>(value.getSource(), 
value.getTarget(), NullValue.getInstance());
+                       }
+               });
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/daa357ac/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
new file mode 100644
index 0000000..0dcdc1f
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.operators.UnsortedGrouping;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.util.Collector;
+
+/**
+ * The summarization algorithm computes a condensed version of the input 
graph<br>
+ * by grouping vertices and edges based on their values. By doing this, the<br>
+ * algorithm helps to uncover insights about patterns and distributions in 
the<br>
+ * graph.
+ * <p>
+ * In the resulting graph, each vertex represents a group of vertices that 
share the<br>
+ * same vertex value. An edge, that connects a vertex with itself, represents 
all edges<br>
+ * with the same edge value that connect vertices inside that group. An edge 
between<br>
+ * vertices in the output graph represents all edges with the same edge value 
between<br>
+ * members of those groups in the input graph.
+ * <p>
+ * Consider the following example:
+ * <p>
+ * Input graph:
+ * <p>
+ * Vertices (id, value):<br>
+ * (0, "A")<br>
+ * (1, "A")<br>
+ * (2, "B")<br>
+ * (3, "B")<br>
+ * <p>
+ * Edges (source, target, value):
+ * (0,1, null)<br>
+ * (1,0, null)<br>
+ * (1,2, null)<br>
+ * (2,1, null)<br>
+ * (2,3, null)<br>
+ * (3,2, null)<br>
+ * <p>
+ * Output graph:
+ * <p>
+ * Vertices (id, (value, count)):<br>
+ * (0, ("A", 2)) // 0 and 1 <br>
+ * (2, ("B", 2)) // 2 and 3 <br>
+ * <p>
+ * Edges (source, target, (value, count)):<br>
+ * (0, 0, (null, 2)) // (0,1) and (1,0) <br>
+ * (2, 2, (null, 2)) // (2,3) and (3,2) <br>
+ * (0, 2, (null, 1)) // (1,2) <br>
+ * (2, 0, (null, 1)) // (2,1) <br>
+ *
+ * Note that this implementation is non-deterministic in the way that it 
assigns<br>
+ * identifiers to summarized vertices. However, it is guaranteed that the 
identifier<br>
+ * is one of the represented vertex identifiers.
+ *
+ * @param <K>  vertex identifier type
+ * @param <VV>         vertex value type
+ * @param <EV>         edge value type
+ */
+public class Summarization<K, VV, EV>
+               implements GraphAlgorithm<K, VV, EV,
+               Graph<K, Summarization.VertexValue<VV>, 
Summarization.EdgeValue<EV>>> {
+
+       @Override
+       public Graph<K, VertexValue<VV>, EdgeValue<EV>> run(Graph<K, VV, EV> 
input) throws Exception {
+               // -------------------------
+               // build summarized vertices
+               // -------------------------
+
+               // group vertices by value
+               UnsortedGrouping<Vertex<K, VV>> vertexUnsortedGrouping = 
input.getVertices()
+                               .groupBy(1);
+               // reduce vertex group and create vertex group items
+               GroupReduceOperator<Vertex<K, VV>, VertexGroupItem<K, VV>> 
vertexGroupItems = vertexUnsortedGrouping
+                               .reduceGroup(new VertexGroupReducer<K, VV>());
+               // create summarized vertices
+               DataSet<Vertex<K, VertexValue<VV>>> summarizedVertices = 
vertexGroupItems
+                               .filter(new 
VertexGroupItemToSummarizedVertexFilter<K, VV>())
+                               .map(new 
VertexGroupItemToSummarizedVertexMapper<K, VV>());
+               // create mapping between vertices and their representative
+               DataSet<VertexWithRepresentative<K>> vertexToRepresentativeMap 
= vertexGroupItems
+                               .filter(new 
VertexGroupItemToRepresentativeFilter<K, VV>())
+                               .map(new 
VertexGroupItemToVertexWithRepresentativeMapper<K, VV>());
+
+               // -------------------------
+               // build summarized edges
+               // -------------------------
+
+               // join edges with vertex representatives and update source and 
target identifiers
+               DataSet<Edge<K, EV>> edgesForGrouping = input.getEdges()
+                               .join(vertexToRepresentativeMap)
+                               .where(0)       // source vertex id
+                               .equalTo(0) // vertex id
+                               .with(new SourceVertexJoinFunction<K, EV>())
+                               .join(vertexToRepresentativeMap)
+                               .where(1)       // target vertex id
+                               .equalTo(0) // vertex id
+                               .with(new TargetVertexJoinFunction<K, EV>());
+               // create summarized edges
+               DataSet<Edge<K, EdgeValue<EV>>> summarizedEdges = 
edgesForGrouping
+                               .groupBy(0, 1, 2) // group by source id (0), 
target id (1) and edge value (2)
+                               .reduceGroup(new EdgeGroupReducer<K, EV>());
+
+               return Graph.fromDataSet(summarizedVertices, summarizedEdges, 
input.getContext());
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Tuple Types
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Value that is stored at a summarized vertex.
+        *
+        * f0: vertex group value
+        * f1: vertex group count
+        *
+        * @param <VV> vertex value type
+        */
+       @SuppressWarnings("serial")
+       public static final class VertexValue<VV> extends Tuple2<VV, Long> {
+
+               public VV getVertexGroupValue() {
+                       return f0;
+               }
+
+               public void setVertexGroupValue(VV vertexGroupValue) {
+                       f0 = vertexGroupValue;
+               }
+
+               public Long getVertexGroupCount() {
+                       return f1;
+               }
+
+               public void setVertexGroupCount(Long vertexGroupCount) {
+                       f1 = vertexGroupCount;
+               }
+       }
+
+       /**
+        * Value that is stored at a summarized edge.
+        *
+        * f0: edge group value
+        * f1: edge group count
+        *
+        * @param <EV> edge value type
+        */
+       @SuppressWarnings("serial")
+       public static final class EdgeValue<EV> extends Tuple2<EV, Long> {
+
+               public EV getEdgeGroupValue() {
+                       return f0;
+               }
+
+               public void setEdgeGroupValue(EV edgeGroupValue) {
+                       f0 = edgeGroupValue;
+               }
+
+               public Long getEdgeGroupCount() {
+                       return f1;
+               }
+
+               public void setEdgeGroupCount(Long edgeGroupCount) {
+                       f1 = edgeGroupCount;
+               }
+       }
+
+       /**
+        * Represents a single vertex in a vertex group.
+        *
+        * f0: vertex identifier
+        * f1: vertex group representative identifier
+        * f2: vertex group value
+        * f3: vertex group count
+        *
+        * @param <K>   vertex identifier type
+        * @param <VGV> vertex group value type
+        */
+       @SuppressWarnings("serial")
+       public static final class VertexGroupItem<K, VGV> extends Tuple4<K, K, 
VGV, Long> {
+
+               public VertexGroupItem() {
+                       setVertexGroupCount(0L);
+               }
+
+               public K getVertexId() {
+                       return f0;
+               }
+
+               public void setVertexId(K vertexId) {
+                       f0 = vertexId;
+               }
+
+               public K getGroupRepresentativeId() {
+                       return f1;
+               }
+
+               public void setGroupRepresentativeId(K groupRepresentativeId) {
+                       f1 = groupRepresentativeId;
+               }
+
+               public VGV getVertexGroupValue() {
+                       return f2;
+               }
+
+               public void setVertexGroupValue(VGV vertexGroupValue) {
+                       f2 = vertexGroupValue;
+               }
+
+               public Long getVertexGroupCount() {
+                       return f3;
+               }
+
+               public void setVertexGroupCount(Long vertexGroupCount) {
+                       f3 = vertexGroupCount;
+               }
+
+               /**
+                * Resets the fields to initial values. This is necessary if 
the tuples are reused and not all fields were modified.
+                */
+               public void reset() {
+                       f0 = null;
+                       f1 = null;
+                       f2 = null;
+                       f3 = 0L;
+               }
+       }
+
+       /**
+        * Represents a vertex identifier and its corresponding vertex group 
identifier.
+        *
+        * @param <K> vertex identifier type
+        */
+       @SuppressWarnings("serial")
+       public static final class VertexWithRepresentative<K> extends Tuple2<K, 
K> {
+
+               public void setVertexId(K vertexId) {
+                       f0 = vertexId;
+               }
+
+               public K getGroupRepresentativeId() {
+                       return f1;
+               }
+
+               public void setGroupRepresentativeId(K groupRepresentativeId) {
+                       f1 = groupRepresentativeId;
+               }
+       }
+
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Functions
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Creates one {@link VertexGroupItem} for each group element 
containing the vertex identifier and the identifier
+        * of the group representative which is the first vertex in the reduce 
input iterable.
+        *
+        * Creates one {@link VertexGroupItem} representing the whole group 
that contains the vertex identifier of the
+        * group representative, the vertex group value and the total number of 
group elements.
+        *
+        * @param <K>   vertex identifier type
+        * @param <VV>  vertex value type
+        */
+       @SuppressWarnings("serial")
+       private static final class VertexGroupReducer<K, VV>
+                       implements GroupReduceFunction<Vertex<K, VV>, 
VertexGroupItem<K, VV>> {
+
+               private final VertexGroupItem<K, VV> reuseVertexGroupItem;
+
+               private VertexGroupReducer() {
+                       this.reuseVertexGroupItem = new VertexGroupItem<>();
+               }
+
+               @Override
+               public void reduce(Iterable<Vertex<K, VV>> values, 
Collector<VertexGroupItem<K, VV>> out) throws Exception {
+                       K vertexGroupRepresentativeID = null;
+                       long vertexGroupCount = 0L;
+                       VV vertexGroupValue = null;
+                       boolean isFirstElement = true;
+
+                       for (Vertex<K, VV> vertex : values) {
+                               if (isFirstElement) {
+                                       // take final group representative 
vertex id from first tuple
+                                       vertexGroupRepresentativeID = 
vertex.getId();
+                                       vertexGroupValue = vertex.getValue();
+                                       isFirstElement = false;
+                               }
+                               // no need to set group value for those tuples
+                               
reuseVertexGroupItem.setVertexId(vertex.getId());
+                               
reuseVertexGroupItem.setGroupRepresentativeId(vertexGroupRepresentativeID);
+                               out.collect(reuseVertexGroupItem);
+                               vertexGroupCount++;
+                       }
+
+                       
createGroupRepresentativeTuple(vertexGroupRepresentativeID, vertexGroupValue, 
vertexGroupCount);
+                       out.collect(reuseVertexGroupItem);
+                       reuseVertexGroupItem.reset();
+               }
+
+               /**
+                * Creates one tuple representing the whole group. This tuple 
is later used to create a summarized vertex for each
+                * group.
+                *
+                * @param vertexGroupRepresentativeId group representative 
vertex identifier
+                * @param vertexGroupValue                                      
group property value
+                * @param vertexGroupCount              total group count
+                */
+               private void createGroupRepresentativeTuple(K 
vertexGroupRepresentativeId,
+                                                                               
                                                                                
                                VV vertexGroupValue,
+                                                                               
                                                                                
                                Long vertexGroupCount) {
+                       
reuseVertexGroupItem.setVertexId(vertexGroupRepresentativeId);
+                       
reuseVertexGroupItem.setVertexGroupValue(vertexGroupValue);
+                       
reuseVertexGroupItem.setVertexGroupCount(vertexGroupCount);
+               }
+       }
+
+       /**
+        * Creates a summarized edge from a group of edges. Counts the number 
of elements in the group.
+        *
+        * @param <K> vertex identifier type
+        * @param <EV> edge group value type
+        */
+       @SuppressWarnings("serial")
+       private static final class EdgeGroupReducer<K, EV>
+                       implements GroupReduceFunction<Edge<K, EV>, Edge<K, 
EdgeValue<EV>>> {
+
+               private final Edge<K, EdgeValue<EV>> reuseEdge;
+
+               private final EdgeValue<EV> reuseEdgeValue;
+
+               private EdgeGroupReducer() {
+                       reuseEdge = new Edge<>();
+                       reuseEdgeValue = new EdgeValue<>();
+               }
+
+               @Override
+               public void reduce(Iterable<Edge<K, EV>> values, 
Collector<Edge<K, EdgeValue<EV>>> out) throws Exception {
+                       K sourceVertexId = null;
+                       K targetVertexId = null;
+                       EV edgeGroupValue = null;
+                       Long edgeGroupCount = 0L;
+                       boolean isFirstElement = true;
+
+                       for (Edge<K, EV> edge : values) {
+                               if (isFirstElement) {
+                                       sourceVertexId = edge.getSource();
+                                       targetVertexId = edge.getTarget();
+                                       edgeGroupValue = edge.getValue();
+                                       isFirstElement = false;
+                               }
+                               edgeGroupCount++;
+                       }
+                       reuseEdgeValue.setEdgeGroupValue(edgeGroupValue);
+                       reuseEdgeValue.setEdgeGroupCount(edgeGroupCount);
+                       reuseEdge.setSource(sourceVertexId);
+                       reuseEdge.setTarget(targetVertexId);
+                       reuseEdge.setValue(reuseEdgeValue);
+                       out.collect(reuseEdge);
+               }
+       }
+
+       /**
+        * Filter tuples that are representing a vertex group. They are used to 
create new summarized vertices and have a
+        * group count greater than zero.
+        *
+        * @param <K>   vertex identifier type
+        * @param <VV>  vertex value type
+        */
+       @SuppressWarnings("serial")
+       @FunctionAnnotation.ForwardedFields("*->*")
+       private static final class VertexGroupItemToSummarizedVertexFilter<K, 
VV>
+                       implements FilterFunction<VertexGroupItem<K, VV>> {
+
+               @Override
+               public boolean filter(VertexGroupItem<K, VV> vertexGroupItem) 
throws Exception {
+                       return 
!vertexGroupItem.getVertexGroupCount().equals(0L);
+               }
+       }
+
+       /**
+        * Filter tuples that are representing a single vertex. They are used 
to update the source and target vertex
+        * identifiers at the edges.
+        *
+        * @param <K>   vertex identifier type
+        * @param <VV>  vertex value type
+        */
+       @SuppressWarnings("serial")
+       @FunctionAnnotation.ForwardedFields("*->*")
+       private static final class VertexGroupItemToRepresentativeFilter<K, VV>
+                       implements FilterFunction<VertexGroupItem<K, VV>> {
+
+               @Override
+               public boolean filter(VertexGroupItem<K, VV> vertexGroupItem) 
throws Exception {
+                       return vertexGroupItem.getVertexGroupCount().equals(0L);
+               }
+       }
+
+       /**
+        * Creates a new vertex representing a vertex group. The vertex stores 
the group value and the number of vertices in
+        * the group.
+        *
+        * @param <K>   vertex identifier type
+        * @param <VV>  vertex value type
+        */
+       @SuppressWarnings("serial")
+       private static final class VertexGroupItemToSummarizedVertexMapper<K, 
VV>
+                       implements MapFunction<VertexGroupItem<K, VV>, 
Vertex<K, VertexValue<VV>>> {
+
+               private final VertexValue<VV> reuseSummarizedVertexValue;
+
+               private VertexGroupItemToSummarizedVertexMapper() {
+                       reuseSummarizedVertexValue = new VertexValue<>();
+               }
+
+               @Override
+               public Vertex<K, VertexValue<VV>> map(VertexGroupItem<K, VV> 
value) throws Exception {
+                       K vertexId = value.getVertexId();
+                       
reuseSummarizedVertexValue.setVertexGroupValue(value.getVertexGroupValue());
+                       
reuseSummarizedVertexValue.setVertexGroupCount(value.getVertexGroupCount());
+                       return new Vertex<>(vertexId, 
reuseSummarizedVertexValue);
+               }
+       }
+
+       /**
+        * Creates a {@link VertexWithRepresentative} from a {@link 
VertexGroupItem}.
+        *
+        * @param <K>   vertex identifier type
+        * @param <VV>  vertex value type
+        */
+       @SuppressWarnings("serial")
+       @FunctionAnnotation.ForwardedFields("f0;f1")
+       private static final class 
VertexGroupItemToVertexWithRepresentativeMapper<K, VV>
+                       implements MapFunction<VertexGroupItem<K, VV>, 
VertexWithRepresentative<K>> {
+
+               private final VertexWithRepresentative<K> 
reuseVertexWithRepresentative;
+
+               private VertexGroupItemToVertexWithRepresentativeMapper() {
+                       reuseVertexWithRepresentative = new 
VertexWithRepresentative<>();
+               }
+
+               @Override
+               public VertexWithRepresentative<K> map(VertexGroupItem<K, VV> 
vertexGroupItem) throws Exception {
+                       
reuseVertexWithRepresentative.setVertexId(vertexGroupItem.getVertexId());
+                       
reuseVertexWithRepresentative.setGroupRepresentativeId(vertexGroupItem.getGroupRepresentativeId());
+                       return reuseVertexWithRepresentative;
+               }
+       }
+
+       /**
+        * Replaces the source vertex id with the vertex group representative 
id and adds the edge group value.
+        *
+        * @param <K>   vertex identifier type
+        * @param <EV>  edge value type
+        */
+       @SuppressWarnings("serial")
+       @FunctionAnnotation.ForwardedFieldsFirst("f1")                  // edge 
target id
+       @FunctionAnnotation.ForwardedFieldsSecond("f1->f0") // vertex group id 
-> edge source id
+       private static final class SourceVertexJoinFunction<K, EV>
+                       implements JoinFunction<Edge<K, EV>, 
VertexWithRepresentative<K>, Edge<K, EV>> {
+
+               private final Edge<K, EV> reuseEdge;
+
+               private SourceVertexJoinFunction() {
+                       this.reuseEdge = new Edge<>();
+               }
+
+               @Override
+               public Edge<K, EV> join(Edge<K, EV> edge, 
VertexWithRepresentative<K> vertex) throws Exception {
+                       reuseEdge.setSource(vertex.getGroupRepresentativeId());
+                       reuseEdge.setTarget(edge.getTarget());
+                       reuseEdge.setValue(edge.getValue());
+                       return reuseEdge;
+               }
+       }
+
+       /**
+        * Replaces the target vertex id with the vertex group identifier.
+        *
+        * @param <K>   vertex identifier type
+        * @param <EV>  edge group value type
+        */
+       @SuppressWarnings("serial")
+       @FunctionAnnotation.ForwardedFieldsFirst("f0;f2") // source vertex id, 
edge group value
+       @FunctionAnnotation.ForwardedFieldsSecond("f1")         // vertex group 
id -> edge target id
+       private static final class TargetVertexJoinFunction<K, EV>
+                       implements JoinFunction<Edge<K, EV>, 
VertexWithRepresentative<K>, Edge<K, EV>> {
+               @Override
+               public Edge<K, EV> join(Edge<K, EV> edge,
+                                                                               
                                VertexWithRepresentative<K> 
vertexRepresentative) throws Exception {
+                       
edge.setTarget(vertexRepresentative.getGroupRepresentativeId());
+                       return edge;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/daa357ac/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java
new file mode 100644
index 0000000..abb4511
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.library;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.SummarizationData;
+import org.apache.flink.graph.library.Summarization;
+import org.apache.flink.graph.library.Summarization.EdgeValue;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class SummarizationITCase extends MultipleProgramsTestBase {
+
+       private static final Pattern TOKEN_SEPARATOR = Pattern.compile(";");
+
+       private static final Pattern ID_SEPARATOR = Pattern.compile(",");
+
+       public SummarizationITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Test
+       public void testWithVertexAndEdgeValues() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, String, String> input = Graph.fromDataSet(
+                               SummarizationData.getVertices(env),
+                               SummarizationData.getEdges(env),
+                               env
+               );
+
+               List<Vertex<Long, Summarization.VertexValue<String>>> 
summarizedVertices = Lists.newArrayList();
+               List<Edge<Long, EdgeValue<String>>> summarizedEdges = 
Lists.newArrayList();
+
+               Graph<Long, Summarization.VertexValue<String>, 
EdgeValue<String>> output =
+                               input.run(new Summarization<Long, String, 
String>());
+
+               output.getVertices().output(new 
LocalCollectionOutputFormat<>(summarizedVertices));
+               output.getEdges().output(new 
LocalCollectionOutputFormat<>(summarizedEdges));
+
+               env.execute();
+
+               validateVertices(SummarizationData.EXPECTED_VERTICES, 
summarizedVertices);
+               validateEdges(SummarizationData.EXPECTED_EDGES_WITH_VALUES, 
summarizedEdges);
+       }
+
+       @Test
+       public void testWithVertexAndAbsentEdgeValues() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, String, NullValue> input = Graph.fromDataSet(
+                               SummarizationData.getVertices(env),
+                               SummarizationData.getEdgesWithAbsentValues(env),
+                               env
+               );
+
+               List<Vertex<Long, Summarization.VertexValue<String>>> 
summarizedVertices = Lists.newArrayList();
+               List<Edge<Long, EdgeValue<NullValue>>> summarizedEdges = 
Lists.newArrayList();
+
+               Graph<Long, Summarization.VertexValue<String>, 
EdgeValue<NullValue>> output =
+                               input.run(new Summarization<Long, String, 
NullValue>());
+
+               output.getVertices().output(new 
LocalCollectionOutputFormat<>(summarizedVertices));
+               output.getEdges().output(new 
LocalCollectionOutputFormat<>(summarizedEdges));
+
+               env.execute();
+
+               validateVertices(SummarizationData.EXPECTED_VERTICES, 
summarizedVertices);
+               validateEdges(SummarizationData.EXPECTED_EDGES_ABSENT_VALUES, 
summarizedEdges);
+       }
+
+       private void validateVertices(String[] expectedVertices,
+                                                                               
                                                List<Vertex<Long, 
Summarization.VertexValue<String>>> actualVertices) {
+               Arrays.sort(expectedVertices);
+               Collections.sort(actualVertices, new Comparator<Vertex<Long, 
Summarization.VertexValue<String>>>() {
+                       @Override
+                       public int compare(Vertex<Long, 
Summarization.VertexValue<String>> o1,
+                                                                               
                 Vertex<Long, Summarization.VertexValue<String>> o2) {
+                               int result = o1.getId().compareTo(o2.getId());
+                               if (result == 0) {
+                                       result = 
o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
+                               }
+                               if (result == 0) {
+                                       result = 
o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
+                               }
+                               if (result == 0) {
+                                       result = 
o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
+                               }
+                               return result;
+                       }
+               });
+
+               for (int i = 0; i < expectedVertices.length; i++) {
+                       validateVertex(expectedVertices[i], 
actualVertices.get(i));
+               }
+       }
+
+       private <EV extends Comparable<EV>> void validateEdges(String[] 
expectedEdges,
+                                                                               
                                 List<Edge<Long, EdgeValue<EV>>> actualEdges) {
+               Arrays.sort(expectedEdges);
+               Collections.sort(actualEdges, new Comparator<Edge<Long, 
EdgeValue<EV>>> () {
+
+                       @Override
+                       public int compare(Edge<Long, EdgeValue<EV>> o1, 
Edge<Long, EdgeValue<EV>> o2) {
+                               int result = 
o1.getSource().compareTo(o2.getSource());
+                               if (result == 0) {
+                                       result = 
o1.getTarget().compareTo(o2.getTarget());
+                               }
+                               if (result == 0) {
+                                       result = 
o1.getTarget().compareTo(o2.getTarget());
+                               }
+                               if (result == 0) {
+                                       result = 
o1.getValue().getEdgeGroupValue().compareTo(o2.getValue().getEdgeGroupValue());
+                               }
+                               if (result == 0) {
+                                       result = 
o1.getValue().getEdgeGroupCount().compareTo(o2.getValue().getEdgeGroupCount());
+                               }
+                               return result;
+                       }
+               });
+
+               for (int i = 0; i < expectedEdges.length; i++) {
+                       validateEdge(expectedEdges[i], actualEdges.get(i));
+               }
+       }
+
+       private void validateVertex(String expected, Vertex<Long, 
Summarization.VertexValue<String>> actual) {
+               String[] tokens = TOKEN_SEPARATOR.split(expected);
+               
assertTrue(getListFromIdRange(tokens[0]).contains(actual.getId()));
+               assertEquals(getGroupValue(tokens[1]), 
actual.getValue().getVertexGroupValue());
+               assertEquals(getGroupCount(tokens[1]), 
actual.getValue().getVertexGroupCount());
+       }
+
+       private <EV> void validateEdge(String expected, Edge<Long, 
EdgeValue<EV>> actual) {
+               String[] tokens = TOKEN_SEPARATOR.split(expected);
+               
assertTrue(getListFromIdRange(tokens[0]).contains(actual.getSource()));
+               
assertTrue(getListFromIdRange(tokens[1]).contains(actual.getTarget()));
+               assertEquals(getGroupValue(tokens[2]), 
actual.getValue().getEdgeGroupValue().toString());
+               assertEquals(getGroupCount(tokens[2]), 
actual.getValue().getEdgeGroupCount());
+       }
+
+       private List<Long> getListFromIdRange(String idRange) {
+               List<Long> result = Lists.newArrayList();
+               for (String id : ID_SEPARATOR.split(idRange)) {
+                       result.add(Long.parseLong(id));
+               }
+               return result;
+       }
+
+       private String getGroupValue(String token) {
+               return ID_SEPARATOR.split(token)[0];
+       }
+
+       private Long getGroupCount(String token) {
+               return Long.valueOf(ID_SEPARATOR.split(token)[1]);
+       }
+}

Reply via email to