> On Nov. 14, 2012, 1:08 a.m., Maja Kabiljo wrote: > > Thanks, Avery, this is a lot of great work. The results seem good, and the > > memory should really go down when this is extended to messages. We do trade > > off some speed for the memory, which is why I'm not sure we should have > > ByteArrayPartition as default. > > > > For these Input/Output classes, a significant part of the code is copied > > from somewhere else, right? If so, I think it would be good to mention it > > in the comment somewhere.
Thanks for the review. I agree that ByteArrayPartition should not be the default (SimplePartition should be). I think I was trying to ensure the tests passed with it and forgot to change it back. Thanks for spotting that. You are right that a lot of the code is copied from DataInputStream/DataOutputStream. I have added this to comments as you suggest. This is a good idea. > On Nov. 14, 2012, 1:08 a.m., Maja Kabiljo wrote: > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/RepresentativePageRankBenchmark.java, > > line 31 > > <https://reviews.apache.org/r/7990/diff/2/?file=187723#file187723line31> > > > > Rename to RepresentativeVertexPageRankBenchmark Changed. > On Nov. 14, 2012, 1:08 a.m., Maja Kabiljo wrote: > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java, > > lines 108-111 > > <https://reviews.apache.org/r/7990/diff/2/?file=187752#file187752line108> > > > > Can you create a separate function to check the remaining bytes, and > > call it everywhere? Great suggestion. Added new method ensureRemaining(int requiredBytes) and used it everywhere I could. > On Nov. 14, 2012, 1:08 a.m., Maja Kabiljo wrote: > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java, > > line 169 > > <https://reviews.apache.org/r/7990/diff/2/?file=187754#file187754line169> > > > > Why do we always write -1 here and ignore the first int value when > > reading? This is the size of the byte array as an int. The place it is used is in ByteArrayPartition#write()/readFields() so that we know how much of the byte[] to write. I added a comment to vertexMap to make this more clear since it is unintuitive. /** * Vertex map for this range (keyed by index). Note that the byte[] is a * serialized vertex with the first four bytes as the length of the vertex * to read. */ private ConcurrentMap<I, byte[]> vertexMap; I also added another comment in write() // Note here that we are writing the size of the vertex data first // as it is encoded in the first four bytes of the byte[] > On Nov. 14, 2012, 1:08 a.m., Maja Kabiljo wrote: > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java, > > line 49 > > <https://reviews.apache.org/r/7990/diff/2/?file=187724#file187724line49> > > > > You can delete VertexIdMessageCollection class, right? Deleted. - Avery ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/7990/#review13308 ----------------------------------------------------------- On Nov. 14, 2012, 3:06 a.m., Avery Ching wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/7990/ > ----------------------------------------------------------- > > (Updated Nov. 14, 2012, 3:06 a.m.) > > > Review request for giraph. > > > Description > ------- > > Our entire graph is currently stored as Java objects in memory. I added an > option to keep only a representative vertex that serializes/deserializes on > the fly and should be used with the new ByteArrayPartition. In conjunction > with a serialized client-side message cache, memory usage then loading > shrinks to almost 1/10 of trunk and loads the input splits almost 3x faster > (see input superstep times below). I added a serializer based on Sun's > unsafe methods that enables this memory savings with a very small performance > hit (maybe a few 1-5% slower). Compared to trunk, when serializing the > messages with our faster serializer, compute time improves significantly as > well against trunk (16.7 -> 12.31 for 2.5B edges, 2.97 -> 1.61 for 250M > edges). There are still further improvements to be made on the server side > where we still store our messages in-memory. I (or someone else) can do that > in a later patch. This also significantly reduces GC time, as there are less > objects to GC. > > - Improves byte[] serialization signficantly > -- Added ExtendedDataInput/ExtendedDataOutput interfaces to allow for some > additional methods needed for byte[] serialization/deserialization > -- Add ExtendedByteArrayDataInput/ExtendedByteArrayDataoutput to > serialize/deserialize Writables to a byte[] > -- Added DynamicChannelBufferOutputStream/DynamicChannelBufferInputStream to > serialize/deserialize Writables to a DynamicChannelBuffer > > - Gives you the choice of partition implementation (SimplePartition (default) > or ByteArrayPartition -> (serialized vertices)) > -- Added a new method to Partition called saveVertex(), which also the > serialization back into the ByteArrayPartition or does nothing when using > SimplePartition > - Gives you the choice of unsafe serialization (using Sun's unsafe class - > default) or regular serialization > - Serializes the messages on the client cache into byte[] (saves memory and > also serializes faster) > -- Created new ByteArrayVertexIdMessageCollection to support the serialized > messages > -- SendVertexRequest now sends Partition objects rather than collections > - Adds 2 more options in PageRankBenchmark to try out RepresentationVertex or > RepresentationVertex with unsafe serialization > - Fixed a bug in LongDoubleFloatDoubleVertex's readFields when edges aren't > cleared before deserializing > > - Added new unittests > -- Replaced TestEdgeListVertex with TestMutableVertex to test all our generic > MutableVertex implementations > --- Added more serialization tests of different serialization > -- TestPartitionStores has more testing of unsafe > serialization/deserialization > > > This addresses bug GIRAPH-417. > https://issues.apache.org/jira/browse/GIRAPH-417 > > > Diffs > ----- > > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/RepresentativeVertexPageRankBenchmark.java > PRE-CREATION > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMutationsCache.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/VertexIdMessageCollection.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeListVertex.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/RepresentativeVertex.java > PRE-CREATION > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java > PRE-CREATION > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/Partition.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/SimplePartition.java > PRE-CREATION > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessageCollection.java > PRE-CREATION > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java > PRE-CREATION > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java > PRE-CREATION > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java > PRE-CREATION > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java > PRE-CREATION > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java > PRE-CREATION > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java > PRE-CREATION > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java > PRE-CREATION > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java > PRE-CREATION > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java > 1408926 > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestMutableVertex.java > PRE-CREATION > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/partition/TestGiraphTransferRegulator.java > PRE-CREATION > > http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java > 1408926 > > Diff: https://reviews.apache.org/r/7990/diff/ > > > Testing > ------- > > Testing: > > All unittests pass > Distributed unittests pass - (except two that also fail in trunk) > Lots of PageRankBenchmark runs on a cluster > > Benchmark results: > > 25 edges / vertex, 10M vertices, 10 workers > Trunk > INFO 2012-11-08 14:43:55,855 [load-0] > org.apache.giraph.graph.InputSplitsCallable - call: Loaded 1 input splits in > 22.475897 secs, (v=1000000, e=25000000) 44492.105 vertices/sec, 1112302.6 > edges/sec > INFO 2012-11-08 14:44:00,411 [main] > org.apache.giraph.graph.BspServiceWorker - finishSuperstep: Waiting on all > requests, superstep -1 totalMem = 81728.6875M, maxMem = 81728.6875M, freeMem > = 76580.54187774658M > INFO 2012-11-08 14:44:05,254 [compute-7] > org.apache.giraph.graph.ComputeCallable - call: Computation took 2.9732208 > secs for 1 partitions on superstep 0. Flushing started > INFO 2012-11-08 14:44:11,180 [main] > org.apache.giraph.graph.BspServiceWorker - finishSuperstep: Superstep 0, > messages = 25000000 totalMem = 81728.6875M, maxMem = 81728.6875M, freeMem = > 74781.9575881958M > Total (milliseconds) 62,413 0 62,413 > Superstep 3 (milliseconds) 2,417 0 2,417 > Setup (milliseconds) 2,731 0 2,731 > Shutdown (milliseconds) 50 0 50 > Superstep 0 (milliseconds) 10,654 0 10,654 > Input superstep (milliseconds) 27,484 0 27,484 > Superstep 2 (milliseconds) 9,475 0 9,475 > Superstep 1 (milliseconds) 9,599 0 9,599 > Total time of GC in milliseconds 225,052 0 225,052 > > 25 edges / vertex, 10M vertices, 10 workers > SimplePartition + EdgeListVertex (after rebase) > INFO 2012-11-08 14:33:15,907 [load-0] > org.apache.giraph.graph.InputSplitsCallable - call: Loaded 1 input splits in > 25.431986 secs, (v=1000000, e=25000000) 39320.562 vertices/sec, 983014.06 > edges/sec > INFO 2012-11-08 14:33:17,501 [main] > org.apache.giraph.graph.BspServiceWorker - finishSuperstep: Waiting on all > requests, superstep -1 totalMem = 81728.6875M, maxMem = 81728.6875M, freeMem > = 76290.28507995605M > INFO 2012-11-08 14:33:20,175 [compute-2] > org.apache.giraph.graph.ComputeCallable - call: Computation took 2.0086238 > secs for 1 partitions on superstep 0. Flushing started > INFO 2012-11-08 14:33:26,667 [main] > org.apache.giraph.graph.BspServiceWorker - finishSuperstep: Superstep 0, > messages = 25000000 totalMem = 81728.6875M, maxMem = 81728.6875M, freeMem = > 73716.20901489258M > Trunk (after rebase) > Total (milliseconds) 68,113 0 68,113 > Superstep 3 (milliseconds) 2,057 0 2,057 > Setup (milliseconds) 9,765 0 9,765 > Shutdown (milliseconds) 59 0 59 > Superstep 0 (milliseconds) 9,180 0 9,180 > Input superstep (milliseconds) 27,525 0 27,525 > Superstep 2 (milliseconds) 9,600 0 9,600 > Superstep 1 (milliseconds) 9,924 0 9,924 > Total time of GC in milliseconds 216,345 0 216,345 > > 250 edges / vertex, 10M vertices, 10 workers > ByteArrayPartition + UnsafeRepresentativeVertex + reuse vertexdata buffer + > unsafe serialization (after rebase) > INFO 2012-11-08 14:33:09,822 [load-0] > org.apache.giraph.graph.InputSplitsCallable - call: Loaded 1 input splits in > 9.3217535 secs, (v=1000000, e=25000000) 107275.95 vertices/sec, 2681898.8 > edges/sec > INFO 2012-11-08 14:33:10,900 [main] > org.apache.giraph.graph.BspServiceWorker - finishSuperstep: Waiting on all > requests, superstep -1 totalMem = 81728.6875M, maxMem = 81728.6875M, freeMem > = 79974.63636779785M > INFO 2012-11-08 14:33:13,213 [compute-7] > org.apache.giraph.graph.ComputeCallable - call: Computation took 1.6110481 > secs for 1 partitions on superstep 0. Flushing started > INFO 2012-11-08 14:33:13,972 [main] > org.apache.giraph.graph.BspServiceWorker - finishSuperstep: Waiting on all > requests, superstep 0 totalMem = 81728.6875M, maxMem = 81728.6875M, freeMem = > 78228.54064941406M > Total (milliseconds) 47,061 0 47,061 > Superstep 3 (milliseconds) 2,175 0 2,175 > Setup (milliseconds) 3,018 0 3,018 > Shutdown (milliseconds) 1,050 0 1,050 > Superstep 0 (milliseconds) 8,780 0 8,780 > Input superstep (milliseconds) 10,952 0 10,952 > Superstep 2 (milliseconds) 10,450 0 10,450 > Superstep 1 (milliseconds) 10,633 0 10,633 > > 250 edges / vertex, 10M vertices, 10 workers > Trunk > INFO 2012-11-08 14:46:25,304 [load-0] > org.apache.giraph.graph.InputSplitsCallable - call: Loaded 1 input splits in > 167.02779 secs, (v=1000000, e=250000000) 5987.028 vertices/sec, 1496757.0 > edges/sec > INFO 2012-11-08 14:46:35,558 [main] > org.apache.giraph.graph.BspServiceWorker - finishSuperstep: Waiting on all > requests, superstep -1 totalMem = 81728.6875M, maxMem = 81728.6875M, freeMem > = 38447.11888885498M > INFO 2012-11-08 14:46:52,963 [compute-14] > org.apache.giraph.graph.ComputeCallable - call: Computation took 16.770031 > secs for 1 partitions on superstep 0. Flushing started > INFO 2012-11-08 14:46:53,074 [main] > org.apache.giraph.graph.BspServiceWorker - finishSuperstep: Waiting on all > requests, superstep 0 totalMem = 81728.6875M, maxMem = 81728.6875M, freeMem = > 24629.869369506836M > Total (milliseconds) 568,094 > 0 568,094 > Superstep 3 (milliseconds) 2,344 > 0 2,344 > Setup (milliseconds) 2,748 > 0 2,748 > Shutdown (milliseconds) 47 > 0 47 > Superstep 0 (milliseconds) 67,853 > 0 67,853 > Input superstep (milliseconds) 177,722 > 0 177,722 > Superstep 2 (milliseconds) 247,518 > 0 247,518 > Superstep 1 (milliseconds) 69,856 > 0 69,856 > Total time of GC in milliseconds > 2,741,892 0 2,741,892 > > 250 edges / vertex, 10M vertices, 10 workers > SimplePartition + EdgeListVertex (after rebase) > INFO 2012-11-08 14:19:57,774 [load-0] > org.apache.giraph.graph.InputSplitsCallable - call: Loaded 1 input splits in > 172.17258 secs, (v=1000000, e=250000000) 5808.126 vertices/sec, 1452031.5 > edges/sec > INFO 2012-11-08 14:20:04,864 [main] > org.apache.giraph.graph.BspServiceWorker - finishSuperstep: Waiting on all > requests, superstep -1 totalMem = 81728.6875M, maxMem = 81728.6875M, freeMem > = 37025.9013671875M > INFO 2012-11-08 14:20:17,453 [compute-6] > org.apache.giraph.graph.ComputeCallable - call: Computation took 11.959192 > secs for 1 partitions on superstep 0. Flushing started > INFO 2012-11-08 14:20:17,606 [main] > org.apache.giraph.graph.BspServiceWorker - finishSuperstep: Waiting on all > requests, superstep 0 totalMem = 81728.6875M, maxMem = 81728.6875M, freeMem = > 21953.103630065918M > Total (milliseconds) 470,845 > 0 470,845 > Superstep 3 (milliseconds) 2,595 > 0 2,595 > Setup (milliseconds) 1,774 > 0 1,774 > Shutdown (milliseconds) 54 > 0 54 > Superstep 0 (milliseconds) 59,609 > 0 59,609 > Input superstep (milliseconds) 179,665 > 0 179,665 > Superstep 2 (milliseconds) 165,848 > 0 165,848 > Superstep 1 (milliseconds) 61,296 > 0 61,296 > Total time of GC in milliseconds > 2,480,260 0 2,480,260 > > 250 edges / vertex, 10M vertices, 10 workers > ByteArrayPartition + UnsafeRepresentativeVertex + reuse vertexdata buffer + > unsafe serialization (after rebase) > INFO 2012-11-08 13:26:50,334 [load-0] > org.apache.giraph.graph.InputSplitsCallable - call: Loaded 1 input splits in > 69.22095 secs, (v=1000000, e=250000000) 14446.494 vertices/sec, 3611623.5 > edges/sec > INFO 2012-11-08 13:26:52,511 [main] > org.apache.giraph.graph.BspServiceWorker - finishSuperstep: Waiting on all > requests, superstep -1 totalMem = 81728.6875M, maxMem = 81728.6875M, freeMem > = 75393.74648284912M > INFO 2012-11-08 13:27:06,441 [compute-5] > org.apache.giraph.graph.ComputeCallable - call: Computation took 12.318953 > secs for 1 partitions on superstep 0. Flushing started > INFO 2012-11-08 13:27:06,483 [main] > org.apache.giraph.graph.BspServiceWorker - finishSuperstep: Waiting on all > requests, superstep 0 totalMem = 81728.6875M, maxMem = 81728.6875M, freeMem = > 62303.2106552124M > Total (milliseconds) 301,720 > 0 301,720 > Superstep 3 (milliseconds) 4,759 > 0 4,759 > Setup (milliseconds) 2,887 > 0 2,887 > Shutdown (milliseconds) 50 > 0 50 > Superstep 0 (milliseconds) 72,625 > 0 72,625 > Input superstep (milliseconds) 75,797 > 0 75,797 > Superstep 2 (milliseconds) 72,245 > 0 72,245 > Superstep 1 (milliseconds) 73,353 > 0 73,353 > Total time of GC in milliseconds > 716,930 0 716,930 > > > Thanks, > > Avery Ching > >
