Repository: giraph Updated Branches: refs/heads/trunk c6af3ed8a -> fafecee71
http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index 1062479..37876d4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -575,7 +575,7 @@ else[HADOOP_NON_SECURE]*/ if (getConfiguration().hasEdgeInputFormat()) { // Move edges from temporary storage to their source vertices. - getServerData().getPartitionStore().moveEdgesToVertices(); + getServerData().getEdgeStore().moveEdgesToVertices(); } // Generate the partition stats for the input superstep and process @@ -783,7 +783,7 @@ else[HADOOP_NON_SECURE]*/ globalCommHandler.finishSuperstep(workerAggregatorRequestProcessor); MessageStore<I, Writable> incomingMessageStore = - getServerData().getPartitionStore().getIncomingMessageStore(); + getServerData().getIncomingMessageStore(); if (incomingMessageStore instanceof AsyncMessageStoreWrapper) { ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete(); } @@ -1010,13 +1010,16 @@ else[HADOOP_NON_SECURE]*/ long nextPrintMsecs = System.currentTimeMillis() + 15000; int partitionIndex = 0; int numPartitions = getPartitionStore().getNumPartitions(); + LOG.info("Write thread started!"); while (true) { Partition<I, V, E> partition = getPartitionStore().getNextPartition(); + LOG.info("partition is : " + partition); if (partition == null) { break; } + LOG.info("start to write a partition"); long verticesWritten = 0; for (Vertex<I, V, E> vertex : partition) { vertexWriter.writeVertex(vertex); @@ -1033,6 +1036,7 @@ else[HADOOP_NON_SECURE]*/ nextPrintMsecs = System.currentTimeMillis() + 15000; nextPrintVertices = verticesWritten + 250000; } + LOG.info("done writing vertices"); if (verticesWritten >= nextUpdateProgressVertices) { WorkerProgress.get().addVerticesStored( @@ -1288,10 +1292,12 @@ else[HADOOP_NON_SECURE]*/ workerContext.write(checkpointOutputStream); getContext().progress(); + // TODO: checkpointing messages along with vertices to avoid multiple loads + // of a partition when out-of-core is enabled. for (Integer partitionId : getPartitionStore().getPartitionIds()) { // write messages checkpointOutputStream.writeInt(partitionId); - getServerData().getPartitionStore().getCurrentMessageStore() + getServerData().getCurrentMessageStore() .writePartition(checkpointOutputStream, partitionId); getContext().progress(); @@ -1539,9 +1545,11 @@ else[HADOOP_NON_SECURE]*/ getConfiguration().updateSuperstepClasses(superstepClasses); getServerData().resetMessageStores(); + // TODO: checkpointing messages along with vertices to avoid multiple + // loads of a partition when out-of-core is enabled. for (int i = 0; i < partitions; i++) { int partitionId = checkpointStream.readInt(); - getServerData().getPartitionStore().getCurrentMessageStore() + getServerData().getCurrentMessageStore() .readFieldsForPartition(checkpointStream, partitionId); } http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java index 2785217..c88aac7 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java @@ -106,15 +106,14 @@ public class RequestFailureTest { private void checkResult(int numRequests) throws IOException { // Check the output Iterable<IntWritable> vertices = - serverData.getPartitionStore().getIncomingMessageStore() - .getPartitionDestinationVertices(0); + serverData.getIncomingMessageStore().getPartitionDestinationVertices(0); int keySum = 0; int messageSum = 0; for (IntWritable vertexId : vertices) { keySum += vertexId.get(); Iterable<IntWritable> messages = - serverData.getPartitionStore().<IntWritable>getIncomingMessageStore() - .getVertexMessages(vertexId); + serverData.<IntWritable>getIncomingMessageStore().getVertexMessages( + vertexId); synchronized (messages) { for (IntWritable message : messages) { messageSum += message.get(); http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java index 2688da1..0462770 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java @@ -178,15 +178,14 @@ public class RequestTest { // Check the output Iterable<IntWritable> vertices = - serverData.getPartitionStore().getIncomingMessageStore() - .getPartitionDestinationVertices(0); + serverData.getIncomingMessageStore().getPartitionDestinationVertices(0); int keySum = 0; int messageSum = 0; for (IntWritable vertexId : vertices) { keySum += vertexId.get(); Iterable<IntWritable> messages = - serverData.getPartitionStore().<IntWritable>getIncomingMessageStore() - .getVertexMessages(vertexId); + serverData.<IntWritable>getIncomingMessageStore().getVertexMessages( + vertexId); synchronized (messages) { for (IntWritable message : messages) { messageSum += message.get(); @@ -224,15 +223,14 @@ public class RequestTest { // Check the output Iterable<IntWritable> vertices = - serverData.getPartitionStore().getIncomingMessageStore() - .getPartitionDestinationVertices(0); + serverData.getIncomingMessageStore().getPartitionDestinationVertices(0); int keySum = 0; int messageSum = 0; for (IntWritable vertexId : vertices) { keySum += vertexId.get(); Iterable<IntWritable> messages = - serverData.getPartitionStore().<IntWritable>getIncomingMessageStore() - .getVertexMessages(vertexId); + serverData.<IntWritable>getIncomingMessageStore().getVertexMessages( + vertexId); synchronized (messages) { for (IntWritable message : messages) { messageSum += message.get(); http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java index 249a337..7893940 100644 --- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java +++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java @@ -23,6 +23,7 @@ import com.google.common.io.Files; import org.apache.commons.io.FileUtils; import org.apache.giraph.bsp.BspService; import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.comm.ServerData; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; @@ -32,7 +33,7 @@ import org.apache.giraph.graph.Vertex; import org.apache.giraph.io.formats.IdWithValueTextOutputFormat; import org.apache.giraph.io.formats.IntIntNullTextVertexInputFormat; import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat; -import org.apache.giraph.ooc.DiskBackedPartitionStore; +import org.apache.giraph.ooc.data.DiskBackedPartitionStore; import org.apache.giraph.utils.InternalVertexRunner; import org.apache.giraph.utils.NoOpComputation; import org.apache.giraph.utils.UnsafeByteArrayInputStream; @@ -50,7 +51,6 @@ import org.mockito.Mockito; import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Random; import java.util.concurrent.ExecutorCompletionService; @@ -98,18 +98,14 @@ public class TestPartitionStores { public void setUp() { GiraphConfiguration configuration = new GiraphConfiguration(); configuration.setComputationClass(MyComputation.class); - conf = new ImmutableClassesGiraphConfiguration<IntWritable, IntWritable, - NullWritable>(configuration); + conf = new ImmutableClassesGiraphConfiguration<>(configuration); context = Mockito.mock(Mapper.Context.class); } @Test public void testSimplePartitionStore() { - CentralizedServiceWorker<IntWritable, IntWritable, NullWritable> - serviceWorker = Mockito.mock(CentralizedServiceWorker.class); PartitionStore<IntWritable, IntWritable, NullWritable> - partitionStore = new SimplePartitionStore<IntWritable, IntWritable, - NullWritable>(conf, context, serviceWorker); + partitionStore = new SimplePartitionStore<>(conf, context); testReadWrite(partitionStore, conf); partitionStore.shutdown(); } @@ -166,10 +162,15 @@ public class TestPartitionStores { serviceWorker = Mockito.mock(CentralizedServiceWorker.class); Mockito.when(serviceWorker.getSuperstep()).thenReturn( BspService.INPUT_SUPERSTEP); - - PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore = - new DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>( - conf, context, serviceWorker); + ServerData<IntWritable, IntWritable, NullWritable> + serverData = new ServerData<>(serviceWorker, conf, context); + Mockito.when(serviceWorker.getServerData()).thenReturn(serverData); + + DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable> + partitionStore = + (DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>) + serverData.getPartitionStore(); + partitionStore.initialize(); testReadWrite(partitionStore, conf); partitionStore.shutdown(); FileUtils.deleteDirectory(directory); @@ -185,16 +186,19 @@ public class TestPartitionStores { CentralizedServiceWorker<IntWritable, IntWritable, NullWritable> serviceWorker = Mockito.mock(CentralizedServiceWorker.class); - Mockito.when(serviceWorker.getSuperstep()).thenReturn( BspService.INPUT_SUPERSTEP); - - PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore = - new DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>( - conf, context, serviceWorker); + ServerData<IntWritable, IntWritable, NullWritable> + serverData = new ServerData<>(serviceWorker, conf, context); + Mockito.when(serviceWorker.getServerData()).thenReturn(serverData); + + DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable> + partitionStore = + (DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>) + serverData.getPartitionStore(); + partitionStore.initialize(); testReadWrite(partitionStore, conf); partitionStore.shutdown(); - FileUtils.deleteDirectory(directory); } @@ -275,18 +279,18 @@ public class TestPartitionStores { GiraphConstants.STATIC_GRAPH.set(conf, true); testMultiThreaded(); } - +/* @Test public void testDiskBackedPartitionStoreAdaptiveOOC() throws Exception { GiraphConstants.STATIC_GRAPH.set(conf, true); testMultiThreaded(); } - +*/ private void testMultiThreaded() throws Exception { final AtomicInteger vertexCounter = new AtomicInteger(0); ExecutorService pool = Executors.newFixedThreadPool(NUM_OF_THREADS); ExecutorCompletionService<Boolean> executor = - new ExecutorCompletionService<Boolean>(pool); + new ExecutorCompletionService<>(pool); File directory = Files.createTempDir(); GiraphConstants.PARTITIONS_DIRECTORY.set( @@ -298,21 +302,25 @@ public class TestPartitionStores { Mockito.when(serviceWorker.getSuperstep()).thenReturn( BspService.INPUT_SUPERSTEP); - - PartitionStore<IntWritable, IntWritable, NullWritable> store = - new DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>( - conf, context, serviceWorker); + ServerData<IntWritable, IntWritable, NullWritable> + serverData = new ServerData<>(serviceWorker, conf, context); + Mockito.when(serviceWorker.getServerData()).thenReturn(serverData); + + DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable> + store = + (DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>) + serverData.getPartitionStore(); store.initialize(); // Create a new Graph in memory using multiple threads for (int i = 0; i < NUM_OF_THREADS; ++i) { - List<Integer> partitionIds = new ArrayList<Integer>(); + List<Integer> partitionIds = new ArrayList<>(); for (int id = i; id < NUM_OF_PARTITIONS; id += NUM_OF_THREADS) { partitionIds.add(id); } Worker worker = new Worker(vertexCounter, store, partitionIds, conf); - executor.submit(worker, new Boolean(true)); + executor.submit(worker, true); } for (int i = 0; i < NUM_OF_THREADS; ++i) executor.take(); @@ -341,11 +349,8 @@ public class TestPartitionStores { for (int i = 0; i < NUM_OF_PARTITIONS; ++i) { partition = store.getNextPartition(); assert partition != null; - Iterator<Vertex<IntWritable, IntWritable, NullWritable>> vertexes = - partition.iterator(); - while (vertexes.hasNext()) { - Vertex<IntWritable, IntWritable, NullWritable> v = vertexes.next(); + for (Vertex<IntWritable, IntWritable, NullWritable> v : partition) { totalValues += v.getId().get(); } store.putPartition(partition); @@ -358,7 +363,6 @@ public class TestPartitionStores { private Partition<IntWritable, IntWritable, NullWritable> getPartition(PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore, int partitionId) { - partitionStore.startIteration(); Partition p; Partition result = null; while ((p = partitionStore.getNextPartition()) != null) { @@ -403,8 +407,11 @@ public class TestPartitionStores { partitionStore.addPartition(createPartition(conf, 3, v5)); partitionStore.addPartition(createPartition(conf, 4, v7)); + partitionStore.startIteration(); getPartition(partitionStore, 1); + partitionStore.startIteration(); getPartition(partitionStore, 2); + partitionStore.startIteration(); partitionStore.removePartition(3); getPartition(partitionStore, 4); @@ -435,16 +442,12 @@ public class TestPartitionStores { * @param expected expected results */ private void checkResults(Iterable<String> results, String[] expected) { - Iterator<String> result = results.iterator(); - - assert results != null; - while(result.hasNext()) { - String resultStr = result.next(); + for (String str : results) { boolean found = false; - for (int j = 0; j < expected.length; ++j) { - if (expected[j].equals(resultStr)) { + for (String expectedStr : expected) { + if (expectedStr.equals(str)) { found = true; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java b/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java index ad9ba6f..6fdfc75 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java +++ b/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java @@ -18,23 +18,14 @@ package org.apache.giraph; -import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.combiner.DoubleSumMessageCombiner; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.examples.GeneratedVertexReader; import org.apache.giraph.examples.SimplePageRankComputation; import org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertexInputFormat; import org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertexOutputFormat; -import org.apache.giraph.graph.BasicComputation; -import org.apache.giraph.graph.Vertex; + import org.apache.giraph.job.GiraphJob; -import org.apache.giraph.ooc.CheckMemoryCallable; -import org.apache.giraph.ooc.DiskBackedPartitionStore; -import org.apache.giraph.ooc.MemoryEstimator; -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.LongWritable; import org.junit.Test; import java.io.IOException; @@ -47,40 +38,14 @@ import static org.junit.Assert.assertTrue; */ public class TestOutOfCore extends BspCase { final static int NUM_PARTITIONS = 32; - final static int MAX_NUM_PARTITIONS_IN_MEMORY = 16; - final static int FAIR_NUM_PARTITIONS_IN_MEMORY = 12; + final static int NUM_PARTITIONS_IN_MEMORY = 16; public TestOutOfCore() { super(TestOutOfCore.class.getName()); } - public static class TestMemoryEstimator implements MemoryEstimator { - private DiskBackedPartitionStore partitionStore; - @Override - public void initialize(CentralizedServiceWorker serviceWorker) { - partitionStore = - (DiskBackedPartitionStore) serviceWorker.getPartitionStore(); - } - - @Override - public double freeMemoryMB() { - int numPartitions = partitionStore.getNumPartitionInMemory(); - if (numPartitions > MAX_NUM_PARTITIONS_IN_MEMORY) { - return 1; - } else if (numPartitions > FAIR_NUM_PARTITIONS_IN_MEMORY) { - return 10; - } else { - return 40; - } - } - - @Override - public double maxMemoryMB() { - return 100; - } - } /** - * Run a job that tests the adaptive out-of-core mechanism + * Run a job that tests the fixed out-of-core mechanism * * @throws IOException * @throws ClassNotFoundException @@ -99,13 +64,12 @@ public class TestOutOfCore extends BspCase { SimplePageRankComputation.SimplePageRankMasterCompute.class); GiraphConstants.USER_PARTITION_COUNT.set(conf, NUM_PARTITIONS); GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true); - GiraphConstants.OUT_OF_CORE_MEM_ESTIMATOR - .set(conf, TestMemoryEstimator.class); - CheckMemoryCallable.CHECK_MEMORY_INTERVAL.set(conf, 5); + GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, NUM_PARTITIONS_IN_MEMORY); GiraphConstants.NUM_COMPUTE_THREADS.set(conf, 8); GiraphConstants.NUM_INPUT_THREADS.set(conf, 8); GiraphConstants.NUM_OOC_THREADS.set(conf, 4); GiraphConstants.NUM_OUTPUT_THREADS.set(conf, 8); + GiraphConstants.PARTITIONS_DIRECTORY.set(conf, "disk0,disk1,disk2"); GiraphJob job = prepareJob(getCallingMethodName(), conf, getTempPath(getCallingMethodName())); // Overwrite the number of vertices set in BspCase
