Repository: giraph Updated Branches: refs/heads/trunk 3793c9ef6 -> 77a24a9fc
GIRAPH-1079: Add triangle counting example Summary: Just moved from internal Test Plan: mvn verify Differential Revision: https://reviews.facebook.net/D60057 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/77a24a9f Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/77a24a9f Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/77a24a9f Branch: refs/heads/trunk Commit: 77a24a9fcd3b01961f881143e485b4be0826c139 Parents: 3793c9e Author: Maja Kabiljo <[email protected]> Authored: Mon Jun 27 10:56:02 2016 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Tue Jun 28 11:55:44 2016 -0700 ---------------------------------------------------------------------- .../UndirectedTriangleCountingBlockFactory.java | 157 +++++++++++++++++++ .../library/triangles/package-info.java | 21 +++ .../TestUndirectedTriangleCounting.java | 82 ++++++++++ 3 files changed, 260 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/77a24a9f/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/triangles/UndirectedTriangleCountingBlockFactory.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/triangles/UndirectedTriangleCountingBlockFactory.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/triangles/UndirectedTriangleCountingBlockFactory.java new file mode 100644 index 0000000..90360d6 --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/triangles/UndirectedTriangleCountingBlockFactory.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.block_app.library.triangles; + +import org.apache.giraph.block_app.framework.AbstractBlockFactory; +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.IfBlock; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; +import org.apache.giraph.block_app.library.prepare_graph.PrepareGraphPieces; +import org.apache.giraph.comm.messages.MessageEncodeAndStoreType; +import org.apache.giraph.conf.BooleanConfOption; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.LongNullHashSetEdges; +import org.apache.giraph.types.ops.LongTypeOps; +import org.apache.giraph.types.ops.collections.array.WLongArrayList; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import com.google.common.base.Preconditions; + +import it.unimi.dsi.fastutil.longs.LongListIterator; + +/** + * Count triangles in undirected graph. + * Assumes that edges in both directions are present, or optionally makes + * graph symmetric first. + */ +public class UndirectedTriangleCountingBlockFactory + extends AbstractBlockFactory<Object> { + /** + * Set to true to make graph symmetric before calculation if it's not already + */ + public static final BooleanConfOption MAKE_GRAPH_SYMMETRIC = + new BooleanConfOption("giraph.triangleCounting.makeGraphSymmetric", + false, "Whether it's needed to first make graph symmetric or not"); + + @Override + protected Class<? extends WritableComparable> getVertexIDClass( + GiraphConfiguration conf) { + return LongWritable.class; + } + + @Override + protected Class<? extends Writable> getVertexValueClass( + GiraphConfiguration conf) { + return LongWritable.class; + } + + @Override + protected Class<? extends Writable> getEdgeValueClass( + GiraphConfiguration conf) { + return NullWritable.class; + } + + @Override + public Block createBlock(GiraphConfiguration conf) { + return new SequenceBlock( + new IfBlock(() -> MAKE_GRAPH_SYMMETRIC.get(conf), + PrepareGraphPieces.makeSymmetricUnweighted(LongTypeOps.INSTANCE)), + new UndirectedCountTrianglesPiece()); + } + + @Override + public Object createExecutionStage(GiraphConfiguration conf) { + return new Object(); + } + + @Override + protected void additionalInitConfig(GiraphConfiguration conf) { + conf.setOutEdgesClass(LongNullHashSetEdges.class); + GiraphConstants.MESSAGE_ENCODE_AND_STORE_TYPE.setIfUnset( + conf, MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX); + } + + /** + * Piece in which all vertices will send to their neighbors the list of their + * neighbors and vertices will count how many of received ids are their + * neighbors. + * TODO could be striped to make memory requirements lower + */ + public static class UndirectedCountTrianglesPiece extends + Piece<LongWritable, LongWritable, NullWritable, WLongArrayList, Object> { + @Override + public VertexSender<LongWritable, LongWritable, NullWritable> + getVertexSender( + BlockWorkerSendApi<LongWritable, LongWritable, NullWritable, + WLongArrayList> workerApi, + Object executionStage) { + WLongArrayList reusableMessage = new WLongArrayList(); + return vertex -> { + reusableMessage.clear(); + for (Edge<LongWritable, NullWritable> edge : vertex.getEdges()) { + // TODO We could only send edge once + reusableMessage.add(edge.getTargetVertexId().get()); + } + workerApi.sendMessageToAllEdges(vertex, reusableMessage); + }; + } + + @Override + public VertexReceiver<LongWritable, LongWritable, NullWritable, + WLongArrayList> getVertexReceiver( + BlockWorkerReceiveApi<LongWritable> workerApi, Object executionStage) { + LongWritable reusableId = new LongWritable(); + return (vertex, messages) -> { + long numTriangles = 0; + for (WLongArrayList message : messages) { + LongListIterator iter = message.iterator(); + while (iter.hasNext()) { + reusableId.set(iter.nextLong()); + if (reusableId.get() != vertex.getId().get() && + vertex.getEdgeValue(reusableId) != null) { + numTriangles++; + } + } + } + Preconditions.checkArgument(numTriangles % 2 == 0); + vertex.getValue().set(numTriangles / 2); + }; + } + + @Override + protected Class<WLongArrayList> getMessageClass() { + return WLongArrayList.class; + } + + @Override + protected boolean allowOneMessageToManyIdsEncoding() { + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/77a24a9f/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/triangles/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/triangles/package-info.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/triangles/package-info.java new file mode 100644 index 0000000..330a384 --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/triangles/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Triangle counting app + */ +package org.apache.giraph.block_app.library.triangles; http://git-wip-us.apache.org/repos/asf/giraph/blob/77a24a9f/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/triangles/TestUndirectedTriangleCounting.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/triangles/TestUndirectedTriangleCounting.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/triangles/TestUndirectedTriangleCounting.java new file mode 100644 index 0000000..5570c6c --- /dev/null +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/triangles/TestUndirectedTriangleCounting.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.block_app.library.triangles; + +import org.apache.giraph.block_app.framework.BlockUtils; +import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner; +import org.apache.giraph.block_app.test_setup.NumericTestGraph; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Test for {@link UndirectedTriangleCountingBlockFactory} + */ +public class TestUndirectedTriangleCounting { + @Test + public void smallTest() throws Exception { + GiraphConfiguration conf = new GiraphConfiguration(); + BlockUtils.setAndInitBlockFactoryClass( + conf, UndirectedTriangleCountingBlockFactory.class); + NumericTestGraph<LongWritable, LongWritable, NullWritable> graph = + new NumericTestGraph<>(conf); + + graph.addSymmetricEdge(1, 4); + graph.addSymmetricEdge(1, 5); + graph.addSymmetricEdge(1, 6); + graph.addSymmetricEdge(2, 4); + graph.addSymmetricEdge(2, 6); + graph.addSymmetricEdge(3, 6); + graph.addSymmetricEdge(4, 5); + graph.addSymmetricEdge(5, 6); + + graph.addSymmetricEdge(7, 8); + graph.addSymmetricEdge(7, 9); + graph.addSymmetricEdge(8, 9); + + graph.addSymmetricEdge(10, 11); + graph.addSymmetricEdge(10, 12); + graph.addSymmetricEdge(10, 13); + graph.addSymmetricEdge(10, 14); + graph.addSymmetricEdge(11, 12); + graph.addSymmetricEdge(11, 13); + graph.addSymmetricEdge(12, 13); + graph.addSymmetricEdge(13, 14); + + LocalBlockRunner.runApp(graph.getTestGraph()); + + assertEquals(graph.getValue(1).get(), 2); + assertEquals(graph.getValue(2).get(), 0); + assertEquals(graph.getValue(3).get(), 0); + assertEquals(graph.getValue(4).get(), 1); + assertEquals(graph.getValue(5).get(), 2); + assertEquals(graph.getValue(6).get(), 1); + assertEquals(graph.getValue(7).get(), 1); + assertEquals(graph.getValue(8).get(), 1); + assertEquals(graph.getValue(9).get(), 1); + assertEquals(graph.getValue(10).get(), 4); + assertEquals(graph.getValue(11).get(), 3); + assertEquals(graph.getValue(12).get(), 3); + assertEquals(graph.getValue(13).get(), 4); + assertEquals(graph.getValue(14).get(), 1); + } +}
