Repository: flink Updated Branches: refs/heads/master 3dc7423c4 -> daa357aca
[FLINK-2411] [gelly] Add Summarization Algorithm * implemented algorithm * implemented integration tests * updated gelly guide This closes #1269 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/daa357ac Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/daa357ac Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/daa357ac Branch: refs/heads/master Commit: daa357aca1fe7745eb7809ac384c5fdc14bd74db Parents: 3dc7423 Author: Martin Junghanns <[email protected]> Authored: Tue Oct 20 09:28:40 2015 +0200 Committer: vasia <[email protected]> Committed: Mon Oct 26 11:12:03 2015 +0100 ---------------------------------------------------------------------- docs/libs/gelly_guide.md | 25 + .../main/java/org/apache/flink/graph/Graph.java | 6 +- .../graph/example/utils/SummarizationData.java | 134 +++++ .../flink/graph/library/Summarization.java | 521 +++++++++++++++++++ .../graph/test/library/SummarizationITCase.java | 189 +++++++ 5 files changed, 873 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/daa357ac/docs/libs/gelly_guide.md ---------------------------------------------------------------------- diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md index 646ec7f..59e1a3b 100644 --- a/docs/libs/gelly_guide.md +++ b/docs/libs/gelly_guide.md @@ -1437,6 +1437,7 @@ Gelly has a growing collection of graph algorithms for easily analyzing large-sc * [GSA Single Source Shortest Paths](#gsa-single-source-shortest-paths) * [GSA Triangle Count](#gsa-triangle-count) * [Triangle Enumerator](#triangle-enumerator) +* [Summarization](#summarization) Gelly's library methods can be used by simply calling the `run()` method on the input graph: @@ -1617,4 +1618,28 @@ This implementation extends the basic algorithm by computing output degrees of e The algorithm takes a directed graph as input and outputs a `DataSet` of `Tuple3`. The Vertex ID type has to be `Comparable`. Each `Tuple3` corresponds to a triangle, with the fields containing the IDs of the vertices forming the triangle. +### Summarization + +#### Overview +The summarization algorithm computes a condensed version of the input graph by grouping vertices and edges based on +their values. In doing so, the algorithm helps to uncover insights about patterns and distributions in the graph. +One possible use case is the visualization of communities where the whole graph is too large and needs to be summarized +based on the community identifier stored at a vertex. + +#### Details +In the resulting graph, each vertex represents a group of vertices that share the same value. An edge, that connects a +vertex with itself, represents all edges with the same edge value that connect vertices from the same vertex group. An +edge between different vertices in the output graph represents all edges with the same edge value between members of +different vertex groups in the input graph. + +The algorithm is implemented using Flink data operators. First, vertices are grouped by their value and a representative +is chosen from each group. For any edge, the source and target vertex identifiers are replaced with the corresponding +representative and grouped by source, target and edge value. Output vertices and edges are created from their +corresponding groupings. + +#### Usage +The algorithm takes a directed, vertex (and possibly edge) attributed graph as input and outputs a new graph where each +vertex represents a group of vertices and each edge represents a group of edges from the input graph. Furthermore, each +vertex and edge in the output graph stores the common group value and the number of represented elements. + [Back to top](#top) http://git-wip-us.apache.org/repos/asf/flink/blob/daa357ac/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 6015be4..68bfb5b 100755 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -1370,7 +1370,7 @@ public class Graph<K, VV, EV> { DataSet<Vertex<K, VV>> newVertices = getVertices().coGroup(verticesToBeRemoved).where(0).equalTo(0) .with(new VerticesRemovalCoGroup<K, VV>()); - DataSet < Edge < K, EV >> newEdges = newVertices.join(getEdges()).where(0).equalTo(0) + DataSet <Edge< K, EV>> newEdges = newVertices.join(getEdges()).where(0).equalTo(0) // if the edge source was removed, the edge will also be removed .with(new ProjectEdgeToBeRemoved<K, VV, EV>()) // if the edge target was removed, the edge will also be removed @@ -1485,7 +1485,9 @@ public class Graph<K, VV, EV> { /** * Performs Difference on the vertex and edge sets of the input graphs - * removes common vertices and edges. If a source/target vertex is removed, its corresponding edge will also be removed + * removes common vertices and edges. If a source/target vertex is removed, + * its corresponding edge will also be removed + * * @param graph the graph to perform difference with * @return a new graph where the common vertices and edges have been removed */ http://git-wip-us.apache.org/repos/asf/flink/blob/daa357ac/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SummarizationData.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SummarizationData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SummarizationData.java new file mode 100644 index 0000000..88f76cc --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SummarizationData.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example.utils; + +import com.google.common.collect.Lists; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.NullValue; + +import java.util.List; + +/** + * Provides the default data set used for Summarization tests. + */ +public class SummarizationData { + + private SummarizationData() {} + + /** + * The resulting vertex id can be any id of the vertices summarized by the single vertex. + * + * Format: + * + * "possible-id[,possible-id];group-value,group-count" + */ + public static final String[] EXPECTED_VERTICES = new String[] { + "0,1;A,2", + "2,3,4;B,3", + "5;C,1" + }; + + /** + * Format: + * + * "possible-source-id[,possible-source-id];possible-target-id[,possible-target-id];group-value,group-count" + */ + public static final String[] EXPECTED_EDGES_WITH_VALUES = new String[] { + "0,1;0,1;A,2", + "0,1;2,3,4;A,1", + "2,3,4;0,1;A,1", + "2,3,4;0,1;C,2", + "2,3,4;2,3,4;B,2", + "5;2,3,4;D,2" + }; + + /** + * Format: + * + * "possible-source-id[,possible-source-id];possible-target-id[,possible-target-id];group-value,group-count" + */ + public static final String[] EXPECTED_EDGES_ABSENT_VALUES = new String[] { + "0,1;0,1;(null),2", + "0,1;2,3,4;(null),1", + "2,3,4;0,1;(null),3", + "2,3,4;2,3,4;(null),2", + "5;2,3,4;(null),2" + }; + + /** + * Creates a set of vertices with attached {@link String} values. + * + * @param env execution environment + * @return vertex data set with string values + */ + public static DataSet<Vertex<Long, String>> getVertices(ExecutionEnvironment env) { + List<Vertex<Long, String>> vertices = Lists.newArrayListWithExpectedSize(6); + vertices.add(new Vertex<>(0L, "A")); + vertices.add(new Vertex<>(1L, "A")); + vertices.add(new Vertex<>(2L, "B")); + vertices.add(new Vertex<>(3L, "B")); + vertices.add(new Vertex<>(4L, "B")); + vertices.add(new Vertex<>(5L, "C")); + + return env.fromCollection(vertices); + } + + /** + * Creates a set of edges with attached {@link String} values. + * + * @param env execution environment + * @return edge data set with string values + */ + public static DataSet<Edge<Long, String>> getEdges(ExecutionEnvironment env) { + List<Edge<Long, String>> edges = Lists.newArrayListWithExpectedSize(10); + edges.add(new Edge<>(0L, 1L, "A")); + edges.add(new Edge<>(1L, 0L, "A")); + edges.add(new Edge<>(1L, 2L, "A")); + edges.add(new Edge<>(2L, 1L, "A")); + edges.add(new Edge<>(2L, 3L, "B")); + edges.add(new Edge<>(3L, 2L, "B")); + edges.add(new Edge<>(4L, 0L, "C")); + edges.add(new Edge<>(4L, 1L, "C")); + edges.add(new Edge<>(5L, 2L, "D")); + edges.add(new Edge<>(5L, 3L, "D")); + + return env.fromCollection(edges); + } + + /** + * Creates a set of edges with {@link NullValue} as edge value. + * + * @param env execution environment + * @return edge data set with null values + */ + @SuppressWarnings("serial") + public static DataSet<Edge<Long, NullValue>> getEdgesWithAbsentValues(ExecutionEnvironment env) { + return getEdges(env).map(new MapFunction<Edge<Long, String>, Edge<Long, NullValue>>() { + @Override + public Edge<Long, NullValue> map(Edge<Long, String> value) throws Exception { + return new Edge<>(value.getSource(), value.getTarget(), NullValue.getInstance()); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/daa357ac/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java new file mode 100644 index 0000000..0dcdc1f --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java @@ -0,0 +1,521 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.api.java.operators.GroupReduceOperator; +import org.apache.flink.api.java.operators.UnsortedGrouping; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Collector; + +/** + * The summarization algorithm computes a condensed version of the input graph<br> + * by grouping vertices and edges based on their values. By doing this, the<br> + * algorithm helps to uncover insights about patterns and distributions in the<br> + * graph. + * <p> + * In the resulting graph, each vertex represents a group of vertices that share the<br> + * same vertex value. An edge, that connects a vertex with itself, represents all edges<br> + * with the same edge value that connect vertices inside that group. An edge between<br> + * vertices in the output graph represents all edges with the same edge value between<br> + * members of those groups in the input graph. + * <p> + * Consider the following example: + * <p> + * Input graph: + * <p> + * Vertices (id, value):<br> + * (0, "A")<br> + * (1, "A")<br> + * (2, "B")<br> + * (3, "B")<br> + * <p> + * Edges (source, target, value): + * (0,1, null)<br> + * (1,0, null)<br> + * (1,2, null)<br> + * (2,1, null)<br> + * (2,3, null)<br> + * (3,2, null)<br> + * <p> + * Output graph: + * <p> + * Vertices (id, (value, count)):<br> + * (0, ("A", 2)) // 0 and 1 <br> + * (2, ("B", 2)) // 2 and 3 <br> + * <p> + * Edges (source, target, (value, count)):<br> + * (0, 0, (null, 2)) // (0,1) and (1,0) <br> + * (2, 2, (null, 2)) // (2,3) and (3,2) <br> + * (0, 2, (null, 1)) // (1,2) <br> + * (2, 0, (null, 1)) // (2,1) <br> + * + * Note that this implementation is non-deterministic in the way that it assigns<br> + * identifiers to summarized vertices. However, it is guaranteed that the identifier<br> + * is one of the represented vertex identifiers. + * + * @param <K> vertex identifier type + * @param <VV> vertex value type + * @param <EV> edge value type + */ +public class Summarization<K, VV, EV> + implements GraphAlgorithm<K, VV, EV, + Graph<K, Summarization.VertexValue<VV>, Summarization.EdgeValue<EV>>> { + + @Override + public Graph<K, VertexValue<VV>, EdgeValue<EV>> run(Graph<K, VV, EV> input) throws Exception { + // ------------------------- + // build summarized vertices + // ------------------------- + + // group vertices by value + UnsortedGrouping<Vertex<K, VV>> vertexUnsortedGrouping = input.getVertices() + .groupBy(1); + // reduce vertex group and create vertex group items + GroupReduceOperator<Vertex<K, VV>, VertexGroupItem<K, VV>> vertexGroupItems = vertexUnsortedGrouping + .reduceGroup(new VertexGroupReducer<K, VV>()); + // create summarized vertices + DataSet<Vertex<K, VertexValue<VV>>> summarizedVertices = vertexGroupItems + .filter(new VertexGroupItemToSummarizedVertexFilter<K, VV>()) + .map(new VertexGroupItemToSummarizedVertexMapper<K, VV>()); + // create mapping between vertices and their representative + DataSet<VertexWithRepresentative<K>> vertexToRepresentativeMap = vertexGroupItems + .filter(new VertexGroupItemToRepresentativeFilter<K, VV>()) + .map(new VertexGroupItemToVertexWithRepresentativeMapper<K, VV>()); + + // ------------------------- + // build summarized edges + // ------------------------- + + // join edges with vertex representatives and update source and target identifiers + DataSet<Edge<K, EV>> edgesForGrouping = input.getEdges() + .join(vertexToRepresentativeMap) + .where(0) // source vertex id + .equalTo(0) // vertex id + .with(new SourceVertexJoinFunction<K, EV>()) + .join(vertexToRepresentativeMap) + .where(1) // target vertex id + .equalTo(0) // vertex id + .with(new TargetVertexJoinFunction<K, EV>()); + // create summarized edges + DataSet<Edge<K, EdgeValue<EV>>> summarizedEdges = edgesForGrouping + .groupBy(0, 1, 2) // group by source id (0), target id (1) and edge value (2) + .reduceGroup(new EdgeGroupReducer<K, EV>()); + + return Graph.fromDataSet(summarizedVertices, summarizedEdges, input.getContext()); + } + + // -------------------------------------------------------------------------------------------- + // Tuple Types + // -------------------------------------------------------------------------------------------- + + /** + * Value that is stored at a summarized vertex. + * + * f0: vertex group value + * f1: vertex group count + * + * @param <VV> vertex value type + */ + @SuppressWarnings("serial") + public static final class VertexValue<VV> extends Tuple2<VV, Long> { + + public VV getVertexGroupValue() { + return f0; + } + + public void setVertexGroupValue(VV vertexGroupValue) { + f0 = vertexGroupValue; + } + + public Long getVertexGroupCount() { + return f1; + } + + public void setVertexGroupCount(Long vertexGroupCount) { + f1 = vertexGroupCount; + } + } + + /** + * Value that is stored at a summarized edge. + * + * f0: edge group value + * f1: edge group count + * + * @param <EV> edge value type + */ + @SuppressWarnings("serial") + public static final class EdgeValue<EV> extends Tuple2<EV, Long> { + + public EV getEdgeGroupValue() { + return f0; + } + + public void setEdgeGroupValue(EV edgeGroupValue) { + f0 = edgeGroupValue; + } + + public Long getEdgeGroupCount() { + return f1; + } + + public void setEdgeGroupCount(Long edgeGroupCount) { + f1 = edgeGroupCount; + } + } + + /** + * Represents a single vertex in a vertex group. + * + * f0: vertex identifier + * f1: vertex group representative identifier + * f2: vertex group value + * f3: vertex group count + * + * @param <K> vertex identifier type + * @param <VGV> vertex group value type + */ + @SuppressWarnings("serial") + public static final class VertexGroupItem<K, VGV> extends Tuple4<K, K, VGV, Long> { + + public VertexGroupItem() { + setVertexGroupCount(0L); + } + + public K getVertexId() { + return f0; + } + + public void setVertexId(K vertexId) { + f0 = vertexId; + } + + public K getGroupRepresentativeId() { + return f1; + } + + public void setGroupRepresentativeId(K groupRepresentativeId) { + f1 = groupRepresentativeId; + } + + public VGV getVertexGroupValue() { + return f2; + } + + public void setVertexGroupValue(VGV vertexGroupValue) { + f2 = vertexGroupValue; + } + + public Long getVertexGroupCount() { + return f3; + } + + public void setVertexGroupCount(Long vertexGroupCount) { + f3 = vertexGroupCount; + } + + /** + * Resets the fields to initial values. This is necessary if the tuples are reused and not all fields were modified. + */ + public void reset() { + f0 = null; + f1 = null; + f2 = null; + f3 = 0L; + } + } + + /** + * Represents a vertex identifier and its corresponding vertex group identifier. + * + * @param <K> vertex identifier type + */ + @SuppressWarnings("serial") + public static final class VertexWithRepresentative<K> extends Tuple2<K, K> { + + public void setVertexId(K vertexId) { + f0 = vertexId; + } + + public K getGroupRepresentativeId() { + return f1; + } + + public void setGroupRepresentativeId(K groupRepresentativeId) { + f1 = groupRepresentativeId; + } + } + + + // -------------------------------------------------------------------------------------------- + // Functions + // -------------------------------------------------------------------------------------------- + + /** + * Creates one {@link VertexGroupItem} for each group element containing the vertex identifier and the identifier + * of the group representative which is the first vertex in the reduce input iterable. + * + * Creates one {@link VertexGroupItem} representing the whole group that contains the vertex identifier of the + * group representative, the vertex group value and the total number of group elements. + * + * @param <K> vertex identifier type + * @param <VV> vertex value type + */ + @SuppressWarnings("serial") + private static final class VertexGroupReducer<K, VV> + implements GroupReduceFunction<Vertex<K, VV>, VertexGroupItem<K, VV>> { + + private final VertexGroupItem<K, VV> reuseVertexGroupItem; + + private VertexGroupReducer() { + this.reuseVertexGroupItem = new VertexGroupItem<>(); + } + + @Override + public void reduce(Iterable<Vertex<K, VV>> values, Collector<VertexGroupItem<K, VV>> out) throws Exception { + K vertexGroupRepresentativeID = null; + long vertexGroupCount = 0L; + VV vertexGroupValue = null; + boolean isFirstElement = true; + + for (Vertex<K, VV> vertex : values) { + if (isFirstElement) { + // take final group representative vertex id from first tuple + vertexGroupRepresentativeID = vertex.getId(); + vertexGroupValue = vertex.getValue(); + isFirstElement = false; + } + // no need to set group value for those tuples + reuseVertexGroupItem.setVertexId(vertex.getId()); + reuseVertexGroupItem.setGroupRepresentativeId(vertexGroupRepresentativeID); + out.collect(reuseVertexGroupItem); + vertexGroupCount++; + } + + createGroupRepresentativeTuple(vertexGroupRepresentativeID, vertexGroupValue, vertexGroupCount); + out.collect(reuseVertexGroupItem); + reuseVertexGroupItem.reset(); + } + + /** + * Creates one tuple representing the whole group. This tuple is later used to create a summarized vertex for each + * group. + * + * @param vertexGroupRepresentativeId group representative vertex identifier + * @param vertexGroupValue group property value + * @param vertexGroupCount total group count + */ + private void createGroupRepresentativeTuple(K vertexGroupRepresentativeId, + VV vertexGroupValue, + Long vertexGroupCount) { + reuseVertexGroupItem.setVertexId(vertexGroupRepresentativeId); + reuseVertexGroupItem.setVertexGroupValue(vertexGroupValue); + reuseVertexGroupItem.setVertexGroupCount(vertexGroupCount); + } + } + + /** + * Creates a summarized edge from a group of edges. Counts the number of elements in the group. + * + * @param <K> vertex identifier type + * @param <EV> edge group value type + */ + @SuppressWarnings("serial") + private static final class EdgeGroupReducer<K, EV> + implements GroupReduceFunction<Edge<K, EV>, Edge<K, EdgeValue<EV>>> { + + private final Edge<K, EdgeValue<EV>> reuseEdge; + + private final EdgeValue<EV> reuseEdgeValue; + + private EdgeGroupReducer() { + reuseEdge = new Edge<>(); + reuseEdgeValue = new EdgeValue<>(); + } + + @Override + public void reduce(Iterable<Edge<K, EV>> values, Collector<Edge<K, EdgeValue<EV>>> out) throws Exception { + K sourceVertexId = null; + K targetVertexId = null; + EV edgeGroupValue = null; + Long edgeGroupCount = 0L; + boolean isFirstElement = true; + + for (Edge<K, EV> edge : values) { + if (isFirstElement) { + sourceVertexId = edge.getSource(); + targetVertexId = edge.getTarget(); + edgeGroupValue = edge.getValue(); + isFirstElement = false; + } + edgeGroupCount++; + } + reuseEdgeValue.setEdgeGroupValue(edgeGroupValue); + reuseEdgeValue.setEdgeGroupCount(edgeGroupCount); + reuseEdge.setSource(sourceVertexId); + reuseEdge.setTarget(targetVertexId); + reuseEdge.setValue(reuseEdgeValue); + out.collect(reuseEdge); + } + } + + /** + * Filter tuples that are representing a vertex group. They are used to create new summarized vertices and have a + * group count greater than zero. + * + * @param <K> vertex identifier type + * @param <VV> vertex value type + */ + @SuppressWarnings("serial") + @FunctionAnnotation.ForwardedFields("*->*") + private static final class VertexGroupItemToSummarizedVertexFilter<K, VV> + implements FilterFunction<VertexGroupItem<K, VV>> { + + @Override + public boolean filter(VertexGroupItem<K, VV> vertexGroupItem) throws Exception { + return !vertexGroupItem.getVertexGroupCount().equals(0L); + } + } + + /** + * Filter tuples that are representing a single vertex. They are used to update the source and target vertex + * identifiers at the edges. + * + * @param <K> vertex identifier type + * @param <VV> vertex value type + */ + @SuppressWarnings("serial") + @FunctionAnnotation.ForwardedFields("*->*") + private static final class VertexGroupItemToRepresentativeFilter<K, VV> + implements FilterFunction<VertexGroupItem<K, VV>> { + + @Override + public boolean filter(VertexGroupItem<K, VV> vertexGroupItem) throws Exception { + return vertexGroupItem.getVertexGroupCount().equals(0L); + } + } + + /** + * Creates a new vertex representing a vertex group. The vertex stores the group value and the number of vertices in + * the group. + * + * @param <K> vertex identifier type + * @param <VV> vertex value type + */ + @SuppressWarnings("serial") + private static final class VertexGroupItemToSummarizedVertexMapper<K, VV> + implements MapFunction<VertexGroupItem<K, VV>, Vertex<K, VertexValue<VV>>> { + + private final VertexValue<VV> reuseSummarizedVertexValue; + + private VertexGroupItemToSummarizedVertexMapper() { + reuseSummarizedVertexValue = new VertexValue<>(); + } + + @Override + public Vertex<K, VertexValue<VV>> map(VertexGroupItem<K, VV> value) throws Exception { + K vertexId = value.getVertexId(); + reuseSummarizedVertexValue.setVertexGroupValue(value.getVertexGroupValue()); + reuseSummarizedVertexValue.setVertexGroupCount(value.getVertexGroupCount()); + return new Vertex<>(vertexId, reuseSummarizedVertexValue); + } + } + + /** + * Creates a {@link VertexWithRepresentative} from a {@link VertexGroupItem}. + * + * @param <K> vertex identifier type + * @param <VV> vertex value type + */ + @SuppressWarnings("serial") + @FunctionAnnotation.ForwardedFields("f0;f1") + private static final class VertexGroupItemToVertexWithRepresentativeMapper<K, VV> + implements MapFunction<VertexGroupItem<K, VV>, VertexWithRepresentative<K>> { + + private final VertexWithRepresentative<K> reuseVertexWithRepresentative; + + private VertexGroupItemToVertexWithRepresentativeMapper() { + reuseVertexWithRepresentative = new VertexWithRepresentative<>(); + } + + @Override + public VertexWithRepresentative<K> map(VertexGroupItem<K, VV> vertexGroupItem) throws Exception { + reuseVertexWithRepresentative.setVertexId(vertexGroupItem.getVertexId()); + reuseVertexWithRepresentative.setGroupRepresentativeId(vertexGroupItem.getGroupRepresentativeId()); + return reuseVertexWithRepresentative; + } + } + + /** + * Replaces the source vertex id with the vertex group representative id and adds the edge group value. + * + * @param <K> vertex identifier type + * @param <EV> edge value type + */ + @SuppressWarnings("serial") + @FunctionAnnotation.ForwardedFieldsFirst("f1") // edge target id + @FunctionAnnotation.ForwardedFieldsSecond("f1->f0") // vertex group id -> edge source id + private static final class SourceVertexJoinFunction<K, EV> + implements JoinFunction<Edge<K, EV>, VertexWithRepresentative<K>, Edge<K, EV>> { + + private final Edge<K, EV> reuseEdge; + + private SourceVertexJoinFunction() { + this.reuseEdge = new Edge<>(); + } + + @Override + public Edge<K, EV> join(Edge<K, EV> edge, VertexWithRepresentative<K> vertex) throws Exception { + reuseEdge.setSource(vertex.getGroupRepresentativeId()); + reuseEdge.setTarget(edge.getTarget()); + reuseEdge.setValue(edge.getValue()); + return reuseEdge; + } + } + + /** + * Replaces the target vertex id with the vertex group identifier. + * + * @param <K> vertex identifier type + * @param <EV> edge group value type + */ + @SuppressWarnings("serial") + @FunctionAnnotation.ForwardedFieldsFirst("f0;f2") // source vertex id, edge group value + @FunctionAnnotation.ForwardedFieldsSecond("f1") // vertex group id -> edge target id + private static final class TargetVertexJoinFunction<K, EV> + implements JoinFunction<Edge<K, EV>, VertexWithRepresentative<K>, Edge<K, EV>> { + @Override + public Edge<K, EV> join(Edge<K, EV> edge, + VertexWithRepresentative<K> vertexRepresentative) throws Exception { + edge.setTarget(vertexRepresentative.getGroupRepresentativeId()); + return edge; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/daa357ac/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java new file mode 100644 index 0000000..abb4511 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.library; + +import com.google.common.collect.Lists; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.example.utils.SummarizationData; +import org.apache.flink.graph.library.Summarization; +import org.apache.flink.graph.library.Summarization.EdgeValue; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.types.NullValue; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.regex.Pattern; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class SummarizationITCase extends MultipleProgramsTestBase { + + private static final Pattern TOKEN_SEPARATOR = Pattern.compile(";"); + + private static final Pattern ID_SEPARATOR = Pattern.compile(","); + + public SummarizationITCase(TestExecutionMode mode) { + super(mode); + } + + @Test + public void testWithVertexAndEdgeValues() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, String, String> input = Graph.fromDataSet( + SummarizationData.getVertices(env), + SummarizationData.getEdges(env), + env + ); + + List<Vertex<Long, Summarization.VertexValue<String>>> summarizedVertices = Lists.newArrayList(); + List<Edge<Long, EdgeValue<String>>> summarizedEdges = Lists.newArrayList(); + + Graph<Long, Summarization.VertexValue<String>, EdgeValue<String>> output = + input.run(new Summarization<Long, String, String>()); + + output.getVertices().output(new LocalCollectionOutputFormat<>(summarizedVertices)); + output.getEdges().output(new LocalCollectionOutputFormat<>(summarizedEdges)); + + env.execute(); + + validateVertices(SummarizationData.EXPECTED_VERTICES, summarizedVertices); + validateEdges(SummarizationData.EXPECTED_EDGES_WITH_VALUES, summarizedEdges); + } + + @Test + public void testWithVertexAndAbsentEdgeValues() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, String, NullValue> input = Graph.fromDataSet( + SummarizationData.getVertices(env), + SummarizationData.getEdgesWithAbsentValues(env), + env + ); + + List<Vertex<Long, Summarization.VertexValue<String>>> summarizedVertices = Lists.newArrayList(); + List<Edge<Long, EdgeValue<NullValue>>> summarizedEdges = Lists.newArrayList(); + + Graph<Long, Summarization.VertexValue<String>, EdgeValue<NullValue>> output = + input.run(new Summarization<Long, String, NullValue>()); + + output.getVertices().output(new LocalCollectionOutputFormat<>(summarizedVertices)); + output.getEdges().output(new LocalCollectionOutputFormat<>(summarizedEdges)); + + env.execute(); + + validateVertices(SummarizationData.EXPECTED_VERTICES, summarizedVertices); + validateEdges(SummarizationData.EXPECTED_EDGES_ABSENT_VALUES, summarizedEdges); + } + + private void validateVertices(String[] expectedVertices, + List<Vertex<Long, Summarization.VertexValue<String>>> actualVertices) { + Arrays.sort(expectedVertices); + Collections.sort(actualVertices, new Comparator<Vertex<Long, Summarization.VertexValue<String>>>() { + @Override + public int compare(Vertex<Long, Summarization.VertexValue<String>> o1, + Vertex<Long, Summarization.VertexValue<String>> o2) { + int result = o1.getId().compareTo(o2.getId()); + if (result == 0) { + result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue()); + } + if (result == 0) { + result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue()); + } + if (result == 0) { + result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue()); + } + return result; + } + }); + + for (int i = 0; i < expectedVertices.length; i++) { + validateVertex(expectedVertices[i], actualVertices.get(i)); + } + } + + private <EV extends Comparable<EV>> void validateEdges(String[] expectedEdges, + List<Edge<Long, EdgeValue<EV>>> actualEdges) { + Arrays.sort(expectedEdges); + Collections.sort(actualEdges, new Comparator<Edge<Long, EdgeValue<EV>>> () { + + @Override + public int compare(Edge<Long, EdgeValue<EV>> o1, Edge<Long, EdgeValue<EV>> o2) { + int result = o1.getSource().compareTo(o2.getSource()); + if (result == 0) { + result = o1.getTarget().compareTo(o2.getTarget()); + } + if (result == 0) { + result = o1.getTarget().compareTo(o2.getTarget()); + } + if (result == 0) { + result = o1.getValue().getEdgeGroupValue().compareTo(o2.getValue().getEdgeGroupValue()); + } + if (result == 0) { + result = o1.getValue().getEdgeGroupCount().compareTo(o2.getValue().getEdgeGroupCount()); + } + return result; + } + }); + + for (int i = 0; i < expectedEdges.length; i++) { + validateEdge(expectedEdges[i], actualEdges.get(i)); + } + } + + private void validateVertex(String expected, Vertex<Long, Summarization.VertexValue<String>> actual) { + String[] tokens = TOKEN_SEPARATOR.split(expected); + assertTrue(getListFromIdRange(tokens[0]).contains(actual.getId())); + assertEquals(getGroupValue(tokens[1]), actual.getValue().getVertexGroupValue()); + assertEquals(getGroupCount(tokens[1]), actual.getValue().getVertexGroupCount()); + } + + private <EV> void validateEdge(String expected, Edge<Long, EdgeValue<EV>> actual) { + String[] tokens = TOKEN_SEPARATOR.split(expected); + assertTrue(getListFromIdRange(tokens[0]).contains(actual.getSource())); + assertTrue(getListFromIdRange(tokens[1]).contains(actual.getTarget())); + assertEquals(getGroupValue(tokens[2]), actual.getValue().getEdgeGroupValue().toString()); + assertEquals(getGroupCount(tokens[2]), actual.getValue().getEdgeGroupCount()); + } + + private List<Long> getListFromIdRange(String idRange) { + List<Long> result = Lists.newArrayList(); + for (String id : ID_SEPARATOR.split(idRange)) { + result.add(Long.parseLong(id)); + } + return result; + } + + private String getGroupValue(String token) { + return ID_SEPARATOR.split(token)[0]; + } + + private Long getGroupCount(String token) { + return Long.valueOf(ID_SEPARATOR.split(token)[1]); + } +}
