Hello, So this week, during TinkerPop's code freeze, Kuppitz and I have been stress testing SparkGraphComputer on a 4 Blade cluster using the Friendster dataset (125M vertices and 2.5B edges).
This is a list of things we learned and fixed. First, Daniel Kuppitz wrote this really helpful script that gave us a huge boost in testing time turn arounds. It does the following: 1. Pulls the latest from git://. 2. Builds the code (being smart to delete grapes!) 3. :install hadoop/spark plugins into the Gremlin console. 4. distribute HADOOP_GREMLIN_LIBS jars to all SparkServer nodes in the cluster. 5. restarts the Spark cluster. 6. plops you at the console ready to rock. Summary: here are the runtimes for g.V().count() over SparkGraphComputer's life time with Friendster. - TinkerPop 3.0.0.MX: 2.5 hours - TinkerPop 3.0.0: 1.5 hours - TinkerPop 3.1.1: 23 minutes *** Of course, this is NOT good for g.V().count() in general, but realize we are loading the entire graph, even though we only need vertices (no edge or properties). https://issues.apache.org/jira/browse/TINKERPOP-962 (g.V().count() should really only take ~5 minutes) For g.V().out().out().out().count() over Friendster: - TinkerPop 3.0.0.MX: 12.8 hours - TinkerPop 3.0.0: 8.6 hours - TinkerPop 3.1.1: 2.4 hours Answer: 215664338057221 (thats 215 trillion length 3 paths in Friendster) 1. Its all about GC control. - Make many workers (we have 5 per machine) each with relatively small heaps (10 gigs each). - The massive heap and store everything in memory model doesn't work -- it just leads to GC stalls. 2. Make use of TinkerPop's new gremlin.spark.graphStorageLevel (default is MEMORY_ONLY). - I like DISK_ONLY as the whole RDD is cached in the cluster's file system. - No fetching back to HDFS for data (especially when you are using ScriptInputFormat which is expensive!) - And you definitely don't want to go back to the graph database and stress it needlessly. - I don't like DISK_AND_MEM unless you know your whole graph will fit in memory. Once you have to start swapping things out of memory, GC. - If you have lots and lots of workers (a big cluster), then DISK_AND_MEM might be good. Be cool if someone tested it. - DISK_ONLY is a lot like Hadoop. Streaming in records at a time from the disk (its fast). 3. I had an insane-o bug in our Combiner implementation. < MAX_AMOUNT should have been <= MAX_AMOUNT. - For g.V().count(), I went from shuffling 500M of data over the network to 6.4K. - The job sped up by 30 minutes after that fix. - Again, the main reason it was so slow, GC. 500mb stream of long payloads reduced to a single machine. Moron. - Its a really bad idea to NOT use combiners for both VertexProgram and MapReduce. - This is like the difference between working and not working. - Also, this is what scares me about path-based traversers (match(), as(), path(), etc.). They can't be bulk'd easily. - We will get smart here though. I have some inklings. 4. Its an absolute must that you partition your graph once loaded. - Once the graph is loaded (graphDB, HDFS, etc.), the Spark partitioner "organizes" the graph around the cluster and then persists it. - For Friendster, this takes about 15 minutes (w/ ScriptInputFormat as the read from HDFS parser). - This is important because the loaded graphRDD is static and just gets a view propagated through it at each iteration. You don't want to keep shuffling this monster on each iteration. - This is also why PersistedXXXRDD is crucial. If you are going to keep running jobs on the same data, the RDD being reused is already partitioned for you! (tada) - For graph system providers, if you provide an InputRDD and you have a partitioner for it, that is a huge savings for SparkGraphComputer. So smart to do so. - By partitioning upfront, I was able to reduce the shuffle load from ~22GB to ~2GB per vertex program iteration on Friendster. Insane. - I was a fool before. I now know how to read Spark logs :) which is probably a good thing for me to know. - This is so important that we now just do it automatically. - However, if the data source and the Spark cluster are already "pair partitioned" we don't repartition! (elegant). - http://tinkerpop.incubator.apache.org/docs/3.1.1-SNAPSHOT/reference/#sparkgraphcomputer (scroll down to the "InputRDD and OutputRDD"-section). 5. The graph data message/view shuffle is a lot of data. Make use of lots of TinkerPop workers() to reduce spills to disk. - TinkerPop 3.1.0 introduced GraphComputer.workers(). In SparkGraphComputer, this is the number of partitions in the RDD. - For Friendster, ScriptInputFormat gives me 229 partitions and g.V().count() takes 48 minutes. - If I 5x this to 1145 using "workers(1145)", g.V().count() takes 25 minutes. - Thats a 2x speed up but just chopping the data into finer slices. - However, for 2290 workers, g.V().count() only gets marginally better -- 23 minutes. - This is all about not spilling to disk and not getting GC all up in it. - Now imagine if the graph provider's InputRDD already has a partitioner -- you are looking at ~10 minutes to g.V().count() Friendster (or like 1 minute if we don't load edges)! 5. I think we need to make Gryo more efficient. I don't think our serialization is optimal :/. Data seems over sized for what it is. This is all assumptions right now. - I also use JavaSerializer for tinkerpop.Payload data and given that that is a significant chunk of what is shuffled --- it might be more that than Gryo. :| - https://issues.apache.org/jira/browse/TINKERPOP-1110 6. There is one last area of the implementation that I think could be improved. But besides that (and minor "use less objects"-style optimizations), I think SparkGraphComputer *is* how it should be. - https://issues.apache.org/jira/browse/TINKERPOP-1108 - If you are a Spark expert, please do review the code and provide feedback. Its really not that much code. - https://github.com/apache/incubator-tinkerpop/blob/09a5d288c4143f2853386ce908c82d9ced3c30e7/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java - https://github.com/apache/incubator-tinkerpop/blob/09a5d288c4143f2853386ce908c82d9ced3c30e7/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java Anywho…. thats that. TinkerPop 3.1.1 will be sweeeeet. Enjoy!, Marko. http://markorodriguez.com