Repository: giraph Updated Branches: refs/heads/trunk 1c5544248 -> 7cacb1830
GIRAPH-1061: Add Connected Components block factory Summary: Add block factory for Connected Components to make it easy to run it. Test Plan: Added a test, mvn clean verify Differential Revision: https://reviews.facebook.net/D57951 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/7cacb183 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/7cacb183 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/7cacb183 Branch: refs/heads/trunk Commit: 7cacb1830c3385098d308979ae45e3ea42b9b16c Parents: 1c55442 Author: Maja Kabiljo <[email protected]> Authored: Mon May 9 16:47:13 2016 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Tue May 10 10:46:30 2016 -0700 ---------------------------------------------------------------------- .../ConnectedComponentsBlockFactory.java | 95 ++++++++++++++++ .../connected_components/package-info.java | 21 ++++ .../TestConnectedComponentsBlockFactory.java | 108 +++++++++++++++++++ 3 files changed, 224 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/7cacb183/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/connected_components/ConnectedComponentsBlockFactory.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/connected_components/ConnectedComponentsBlockFactory.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/connected_components/ConnectedComponentsBlockFactory.java new file mode 100644 index 0000000..872cfd5 --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/connected_components/ConnectedComponentsBlockFactory.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.block_app.library.connected_components; + +import org.apache.giraph.block_app.framework.AbstractBlockFactory; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.IfBlock; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.block_app.library.prepare_graph.PrepareGraphPieces; +import org.apache.giraph.block_app.library.prepare_graph.UndirectedConnectedComponents; +import org.apache.giraph.comm.messages.MessageEncodeAndStoreType; +import org.apache.giraph.conf.BooleanConfOption; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.edge.LongNullArrayEdges; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.types.ops.LongTypeOps; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Block factory for connected components + */ +public class ConnectedComponentsBlockFactory + extends AbstractBlockFactory<Object> { + /** + * If input graph is already symmetric this can be set to false to skip + * symmetrizing step, otherwise keep it true in order to get correct results. + */ + public static final BooleanConfOption MAKE_GRAPH_SYMMETRIC = + new BooleanConfOption("giraph.connectedComponents.makeGraphSymmetric", + true, "Whether it's needed to first make graph symmetric or not"); + + @Override + protected Class<? extends WritableComparable> getVertexIDClass( + GiraphConfiguration conf) { + return LongWritable.class; + } + + @Override + protected Class<? extends Writable> getVertexValueClass( + GiraphConfiguration conf) { + return LongWritable.class; + } + + @Override + protected Class<? extends Writable> getEdgeValueClass( + GiraphConfiguration conf) { + return NullWritable.class; + } + + @Override + public Block createBlock(GiraphConfiguration conf) { + return new SequenceBlock( + // Make graph symmetric if needed + new IfBlock(() -> MAKE_GRAPH_SYMMETRIC.get(conf), + PrepareGraphPieces.makeSymmetricUnweighted(LongTypeOps.INSTANCE)), + UndirectedConnectedComponents. + <LongWritable>calculateConnectedComponents( + Integer.MAX_VALUE, + Vertex::getValue, + (vertex, component) -> vertex.getValue().set(component.get()))); + } + + @Override + public Object createExecutionStage(GiraphConfiguration conf) { + return new Object(); + } + + @Override + protected void additionalInitConfig(GiraphConfiguration conf) { + // Save on network traffic by only sending one message value per worker + GiraphConstants.MESSAGE_ENCODE_AND_STORE_TYPE.setIfUnset( + conf, MessageEncodeAndStoreType.EXTRACT_BYTEARRAY_PER_PARTITION); + conf.setOutEdgesClass(LongNullArrayEdges.class); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/7cacb183/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/connected_components/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/connected_components/package-info.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/connected_components/package-info.java new file mode 100644 index 0000000..6b1261a --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/connected_components/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Block factory for connected components + */ +package org.apache.giraph.block_app.library.connected_components; http://git-wip-us.apache.org/repos/asf/giraph/blob/7cacb183/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/connected_components/TestConnectedComponentsBlockFactory.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/connected_components/TestConnectedComponentsBlockFactory.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/connected_components/TestConnectedComponentsBlockFactory.java new file mode 100644 index 0000000..3bf8e4f --- /dev/null +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/connected_components/TestConnectedComponentsBlockFactory.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.block_app.library.connected_components; + +import org.apache.giraph.block_app.framework.BlockUtils; +import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner; +import org.apache.giraph.block_app.test_setup.NumericTestGraph; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Connected components test + */ +public class TestConnectedComponentsBlockFactory { + @Test + public void testAlreadySymmetric() throws Exception { + GiraphConfiguration conf = new GiraphConfiguration(); + BlockUtils.setAndInitBlockFactoryClass( + conf, ConnectedComponentsBlockFactory.class); + ConnectedComponentsBlockFactory.MAKE_GRAPH_SYMMETRIC.set(conf, false); + NumericTestGraph<LongWritable, LongWritable, NullWritable> graph = + new NumericTestGraph<>(conf); + + graph.addSymmetricEdge(1, 2); + graph.addSymmetricEdge(2, 3); + graph.addSymmetricEdge(3, 0); + graph.addSymmetricEdge(0, 5); + graph.addSymmetricEdge(0, 6); + graph.addSymmetricEdge(8, 9); + + LocalBlockRunner.runApp(graph.getTestGraph()); + + assertEquals(graph.getValue(0).get(), 0); + assertEquals(graph.getValue(1).get(), 0); + assertEquals(graph.getValue(2).get(), 0); + assertEquals(graph.getValue(3).get(), 0); + assertEquals(graph.getValue(5).get(), 0); + assertEquals(graph.getValue(6).get(), 0); + assertEquals(graph.getValue(8).get(), 8); + assertEquals(graph.getValue(9).get(), 8); + } + + @Test + public void testNotSymmetric() throws Exception { + GiraphConfiguration conf = new GiraphConfiguration(); + BlockUtils.setAndInitBlockFactoryClass( + conf, ConnectedComponentsBlockFactory.class); + NumericTestGraph<LongWritable, LongWritable, NullWritable> graph = + new NumericTestGraph<>(conf); + + graph.addEdge(-2, 4); + graph.addEdge(1, 2); + graph.addEdge(1, 3); + graph.addEdge(1, -2); + graph.addEdge(3, 1); + graph.addEdge(3, 4); + graph.addEdge(4, 3); + graph.addEdge(4, 13); + graph.addEdge(12, 5); + graph.addEdge(12, 13); + graph.addEdge(13, 4); + + graph.addEdge(6, 7); + graph.addEdge(7, 11); + graph.addEdge(8, 6); + graph.addEdge(10, 7); + graph.addEdge(10, 11); + + graph.addVertex(9); + + LocalBlockRunner.runApp(graph.getTestGraph()); + + assertEquals(graph.getValue(-2).get(), -2); + assertEquals(graph.getValue(1).get(), -2); + assertEquals(graph.getValue(2).get(), -2); + assertEquals(graph.getValue(3).get(), -2); + assertEquals(graph.getValue(4).get(), -2); + assertEquals(graph.getValue(5).get(), -2); + assertEquals(graph.getValue(6).get(), 6); + assertEquals(graph.getValue(7).get(), 6); + assertEquals(graph.getValue(8).get(), 6); + assertEquals(graph.getValue(9).get(), 9); + assertEquals(graph.getValue(10).get(), 6); + assertEquals(graph.getValue(11).get(), 6); + assertEquals(graph.getValue(12).get(), -2); + assertEquals(graph.getValue(13).get(), -2); + } +}
