Repository: giraph Updated Branches: refs/heads/trunk 614df7f9f -> f732f300f
http://git-wip-us.apache.org/repos/asf/giraph/blob/bd4127b7/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java index a04b703..d3c392e 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java @@ -18,6 +18,11 @@ package org.apache.giraph.comm.messages; +import java.io.IOException; +import java.util.Iterator; + +import junit.framework.Assert; + import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.combiner.DoubleSumMessageCombiner; import org.apache.giraph.comm.messages.primitives.LongByteArrayMessageStore; @@ -33,6 +38,7 @@ import org.apache.giraph.utils.ByteArrayVertexIdMessages; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -42,14 +48,10 @@ import org.mockito.stubbing.Answer; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import junit.framework.Assert; - -import java.io.IOException; -import java.util.Iterator; - public class TestLongDoublePrimitiveMessageStores { private static final int NUM_PARTITIONS = 2; - private static CentralizedServiceWorker<LongWritable, ?, ?> service; + private static CentralizedServiceWorker<LongWritable, Writable, Writable> + service; @Before public void prepare() throws IOException { @@ -83,8 +85,9 @@ public class TestLongDoublePrimitiveMessageStores { } } - private static ImmutableClassesGiraphConfiguration<LongWritable, ?, ?> - createLongDoubleConf() { + private static ImmutableClassesGiraphConfiguration<LongWritable, Writable, + Writable> createLongDoubleConf() { + GiraphConfiguration initConf = new GiraphConfiguration(); initConf.setComputationClass(LongDoubleNoOpComputation.class); return new ImmutableClassesGiraphConfiguration(initConf); http://git-wip-us.apache.org/repos/asf/giraph/blob/bd4127b7/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java index 08f4544..7605fb5 100644 --- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java +++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java @@ -18,40 +18,66 @@ package org.apache.giraph.partition; +import static org.apache.giraph.conf.GiraphConstants.MAX_PARTITIONS_IN_MEMORY; +import static org.apache.giraph.conf.GiraphConstants.PARTITIONS_DIRECTORY; +import static org.apache.giraph.conf.GiraphConstants.USER_PARTITION_COUNT; +import static org.apache.giraph.conf.GiraphConstants.USE_OUT_OF_CORE_GRAPH; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.Random; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.commons.io.FileUtils; +import org.apache.giraph.bsp.BspService; +import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.edge.EdgeFactory; +import org.apache.giraph.graph.BasicComputation; import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.formats.IdWithValueTextOutputFormat; +import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat; +import org.apache.giraph.utils.InternalVertexRunner; import org.apache.giraph.utils.NoOpComputation; import org.apache.giraph.utils.UnsafeByteArrayInputStream; import org.apache.giraph.utils.UnsafeByteArrayOutputStream; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Mapper; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import com.google.common.collect.Iterables; import com.google.common.io.Files; -import java.io.File; -import java.io.IOException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; - /** * Test case for partition stores. */ +@SuppressWarnings("unchecked") public class TestPartitionStores { private ImmutableClassesGiraphConfiguration<IntWritable, IntWritable, NullWritable> conf; private Mapper<?, ?, ?, ?>.Context context; + /* these static variables are used for the multithreaded tests */ + private static final int NUM_OF_VERTEXES_PER_THREAD = 10; + private static final int NUM_OF_EDGES_PER_VERTEX = 5; + private static final int NUM_OF_THREADS = 10; + private static final int NUM_OF_PARTITIONS = 3; + public static class MyComputation extends NoOpComputation<IntWritable, IntWritable, NullWritable, IntWritable> { } @@ -74,7 +100,7 @@ public class TestPartitionStores { configuration.setComputationClass(MyComputation.class); conf = new ImmutableClassesGiraphConfiguration<IntWritable, IntWritable, NullWritable>(configuration); - context = mock(Mapper.Context.class); + context = Mockito.mock(Mapper.Context.class); } @Test @@ -125,17 +151,24 @@ public class TestPartitionStores { } @Test - public void testDiskBackedPartitionStoreWithByteArrayPartition() throws IOException { + public void testDiskBackedPartitionStoreWithByteArrayPartition() + throws IOException { + File directory = Files.createTempDir(); GiraphConstants.PARTITIONS_DIRECTORY.set( conf, new File(directory, "giraph_partitions").toString()); GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true); GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1); conf.setPartitionClass(ByteArrayPartition.class); - + + CentralizedServiceWorker<IntWritable, IntWritable, NullWritable> + serviceWorker = Mockito.mock(CentralizedServiceWorker.class); + Mockito.when(serviceWorker.getSuperstep()).thenReturn( + BspService.INPUT_SUPERSTEP); + PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore = new DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>( - conf, context); + conf, context, serviceWorker); testReadWrite(partitionStore, conf); partitionStore.shutdown(); FileUtils.deleteDirectory(directory); @@ -149,20 +182,174 @@ public class TestPartitionStores { GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true); GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1); + CentralizedServiceWorker<IntWritable, IntWritable, NullWritable> + serviceWorker = Mockito.mock(CentralizedServiceWorker.class); + + Mockito.when(serviceWorker.getSuperstep()).thenReturn( + BspService.INPUT_SUPERSTEP); + PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore = new DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>( - conf, context); + conf, context, serviceWorker); testReadWrite(partitionStore, conf); partitionStore.shutdown(); GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 2); partitionStore = new DiskBackedPartitionStore<IntWritable, - IntWritable, NullWritable>(conf, context); + IntWritable, NullWritable>(conf, context, serviceWorker); testReadWrite(partitionStore, conf); partitionStore.shutdown(); FileUtils.deleteDirectory(directory); } + @Test + public void testDiskBackedPartitionStoreWithByteArrayComputation() + throws Exception { + + Iterable<String> results; + String[] graph = + { + "[1,0,[]]", "[2,0,[]]", "[3,0,[]]", "[4,0,[]]", "[5,0,[]]", + "[6,0,[]]", "[7,0,[]]", "[8,0,[]]", "[9,0,[]]", "[10,0,[]]" + }; + String[] expected = + { + "1\t0", "2\t0", "3\t0", "4\t0", "5\t0", + "6\t0", "7\t0", "8\t0", "9\t0", "10\t0" + }; + + USE_OUT_OF_CORE_GRAPH.set(conf, true); + MAX_PARTITIONS_IN_MEMORY.set(conf, 1); + USER_PARTITION_COUNT.set(conf, 10); + + File directory = Files.createTempDir(); + PARTITIONS_DIRECTORY.set(conf, + new File(directory, "giraph_partitions").toString()); + + conf.setPartitionClass(ByteArrayPartition.class); + conf.setComputationClass(EmptyComputation.class); + conf.setVertexInputFormatClass(JsonLongDoubleFloatDoubleVertexInputFormat.class); + conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); + + results = InternalVertexRunner.run(conf, graph); + checkResults(results, expected); + FileUtils.deleteDirectory(directory); + } + + @Test + public void testDiskBackedPartitionStoreMT() throws Exception { + GiraphConstants.STATIC_GRAPH.set(conf, false); + testMultiThreaded(); + } + + /* + @Test + public void testDiskBackedPartitionStoreMTStatic() throws Exception { + GiraphConstants.STATIC_GRAPH.set(conf, true); + testMultiThreaded(); + } + */ + + private void testMultiThreaded() throws Exception { + final AtomicInteger vertexCounter = new AtomicInteger(0); + ExecutorService pool = Executors.newFixedThreadPool(NUM_OF_THREADS); + ExecutorCompletionService<Boolean> executor = + new ExecutorCompletionService<Boolean>(pool); + + File directory = Files.createTempDir(); + GiraphConstants.PARTITIONS_DIRECTORY.set( + conf, new File(directory, "giraph_partitions").toString()); + GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true); + GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1); + + CentralizedServiceWorker<IntWritable, IntWritable, NullWritable> + serviceWorker = Mockito.mock(CentralizedServiceWorker.class); + + Mockito.when(serviceWorker.getSuperstep()).thenReturn( + BspService.INPUT_SUPERSTEP); + + PartitionStore<IntWritable, IntWritable, NullWritable> store = + new DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>( + conf, context, serviceWorker); + + // Create a new Graph in memory using multiple threads + for (int i = 0; i < NUM_OF_THREADS; ++i) { + int partitionId = i % NUM_OF_PARTITIONS; + Worker worker = + new Worker(vertexCounter, store, partitionId, conf); + executor.submit(worker, new Boolean(true)); + } + for (int i = 0; i < NUM_OF_THREADS; ++i) + executor.take(); + pool.shutdownNow(); + + // Check the number of vertices + int totalVertexes = 0; + int totalEdges = 0; + Partition<IntWritable, IntWritable, NullWritable> partition; + for (int i = 0; i < NUM_OF_PARTITIONS; ++i) { + partition = store.getOrCreatePartition(i); + totalVertexes += partition.getVertexCount(); + totalEdges += partition.getEdgeCount(); + store.putPartition(partition); + } + assert vertexCounter.get() == NUM_OF_THREADS * NUM_OF_VERTEXES_PER_THREAD; + assert totalVertexes == NUM_OF_THREADS * NUM_OF_VERTEXES_PER_THREAD; + assert totalEdges == totalVertexes * NUM_OF_EDGES_PER_VERTEX; + + // Check the content of the vertices + int expected = 0; + for (int i = 0; i < NUM_OF_VERTEXES_PER_THREAD * NUM_OF_VERTEXES_PER_THREAD; ++i) { + expected += i; + } + int totalValues = 0; + for (int i = 0; i < NUM_OF_PARTITIONS; ++i) { + partition = store.getOrCreatePartition(i); + Iterator<Vertex<IntWritable, IntWritable, NullWritable>> vertexes = + partition.iterator(); + + while (vertexes.hasNext()) { + Vertex<IntWritable, IntWritable, NullWritable> v = vertexes.next(); + totalValues += v.getId().get(); + } + store.putPartition(partition); + } + assert totalValues == expected; + + store.shutdown(); + } + + @Test + public void testDiskBackedPartitionStoreComputation() throws Exception { + Iterable<String> results; + String[] graph = + { + "[1,0,[]]", "[2,0,[]]", "[3,0,[]]", "[4,0,[]]", "[5,0,[]]", + "[6,0,[]]", "[7,0,[]]", "[8,0,[]]", "[9,0,[]]", "[10,0,[]]" + }; + String[] expected = + { + "1\t0", "2\t0", "3\t0", "4\t0", "5\t0", + "6\t0", "7\t0", "8\t0", "9\t0", "10\t0" + }; + + USE_OUT_OF_CORE_GRAPH.set(conf, true); + MAX_PARTITIONS_IN_MEMORY.set(conf, 1); + USER_PARTITION_COUNT.set(conf, 10); + + File directory = Files.createTempDir(); + PARTITIONS_DIRECTORY.set(conf, + new File(directory, "giraph_partitions").toString()); + + conf.setComputationClass(EmptyComputation.class); + conf.setVertexInputFormatClass(JsonLongDoubleFloatDoubleVertexInputFormat.class); + conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); + + results = InternalVertexRunner.run(conf, graph); + checkResults(results, expected); + FileUtils.deleteDirectory(directory); + } + /** * Test reading/writing to/from a partition store * @@ -238,17 +425,58 @@ public class TestPartitionStores { partitionStore.deletePartition(2); assertEquals(2, partitionStore.getNumPartitions()); } - + + /** + * Internal checker to verify the correctness of the tests. + * @param results the actual results obtaind + * @param expected expected results + */ + private void checkResults(Iterable<String> results, String[] expected) { + Iterator<String> result = results.iterator(); + + assert results != null; + + while(result.hasNext()) { + String resultStr = result.next(); + boolean found = false; + + for (int j = 0; j < expected.length; ++j) { + if (expected[j].equals(resultStr)) { + found = true; + } + } + + assert found; + } + } + + /** + * Test compute method that sends each edge a notification of its parents. + * The test set only has a 1-1 parent-to-child ratio for this unit test. + */ + public static class EmptyComputation + extends BasicComputation<LongWritable, DoubleWritable, FloatWritable, + LongWritable> { + + @Override + public void compute( + Vertex<LongWritable, DoubleWritable,FloatWritable> vertex, + Iterable<LongWritable> messages) throws IOException { + + vertex.voteToHalt(); + } + } + @Test public void testEdgeCombineWithSimplePartition() throws IOException { testEdgeCombine(SimplePartition.class); } - + @Test public void testEdgeCombineWithByteArrayPartition() throws IOException { testEdgeCombine(ByteArrayPartition.class); } - + private void testEdgeCombine(Class<? extends Partition> partitionClass) throws IOException { Vertex<IntWritable, IntWritable, NullWritable> v1 = conf.createVertex(); @@ -286,4 +514,46 @@ public class TestPartitionStores { assertEquals(new IntWritable(1), v1.getValue()); assertEquals(2, v1.getNumEdges()); } + + private class Worker implements Runnable { + + private final AtomicInteger vertexCounter; + private final PartitionStore<IntWritable, IntWritable, NullWritable> + partitionStore; + private final int partitionId; + private final ImmutableClassesGiraphConfiguration<IntWritable, IntWritable, + NullWritable> conf; + + public Worker(AtomicInteger vertexCounter, + PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore, + int partitionId, + ImmutableClassesGiraphConfiguration<IntWritable, IntWritable, + NullWritable> conf) { + + this.vertexCounter = vertexCounter; + this.partitionStore = partitionStore; + this.partitionId = partitionId; + this.conf = conf; + } + + public void run() { + for (int i = 0; i < NUM_OF_VERTEXES_PER_THREAD; ++i) { + int id = vertexCounter.getAndIncrement(); + Vertex<IntWritable, IntWritable, NullWritable> v = conf.createVertex(); + v.initialize(new IntWritable(id), new IntWritable(id)); + + Partition<IntWritable, IntWritable, NullWritable> partition = + partitionStore.getOrCreatePartition(partitionId); + + Random rand = new Random(id); + for (int j = 0; j < NUM_OF_EDGES_PER_VERTEX; ++j) { + int dest = rand.nextInt(id + 1); + v.addEdge(EdgeFactory.create(new IntWritable(dest))); + } + + partition.putVertex(v); + partitionStore.putPartition(partition); + } + } + } }
