Hello,

So this week, during TinkerPop's code freeze, Kuppitz and I have been stress 
testing SparkGraphComputer on a 4 Blade cluster using the Friendster dataset 
(125M vertices and 2.5B edges).

This is a list of things we learned and fixed.

        First, Daniel Kuppitz wrote this really helpful script that gave us a 
huge boost in testing time turn arounds. It does the following:
                1. Pulls the latest from git://.
                2. Builds the code (being smart to delete grapes!)
                3. :install hadoop/spark plugins into the Gremlin console.
                4. distribute HADOOP_GREMLIN_LIBS jars to all SparkServer nodes 
in the cluster.
                5. restarts the Spark cluster.
                6. plops you at the console ready to rock.

        Summary: here are the runtimes for g.V().count() over 
SparkGraphComputer's life time with Friendster.
                - TinkerPop 3.0.0.MX:  2.5 hours
                - TinkerPop 3.0.0:         1.5 hours
                - TinkerPop 3.1.1:         23 minutes

                *** Of course, this is NOT good for g.V().count() in general, 
but realize we are loading the entire graph, even though we only need vertices 
(no edge or properties).
                        https://issues.apache.org/jira/browse/TINKERPOP-962 
(g.V().count() should really only take ~5 minutes)

                For g.V().out().out().out().count() over Friendster:
                        - TinkerPop 3.0.0.MX: 12.8 hours
                        - TinkerPop 3.0.0:        8.6 hours
                        - TinkerPop 3.1.1:        2.4 hours
                Answer: 215664338057221 (thats 215 trillion length 3 paths in 
Friendster)

        1. Its all about GC control. 
                - Make many workers (we have 5 per machine) each with 
relatively small heaps (10 gigs each).
                - The massive heap and store everything in memory model doesn't 
work -- it just leads to GC stalls.
        
        2. Make use of TinkerPop's new gremlin.spark.graphStorageLevel (default 
is MEMORY_ONLY).
                - I like DISK_ONLY as the whole RDD is cached in the cluster's 
file system. 
                        - No fetching back to HDFS for data (especially when 
you are using ScriptInputFormat which is expensive!)
                        - And you definitely don't want to go back to the graph 
database and stress it needlessly.
                - I don't like DISK_AND_MEM unless you know your whole graph 
will fit in memory. Once you have to start swapping things out of memory, GC.
                        - If you have lots and lots of workers (a big cluster), 
then DISK_AND_MEM might be good. Be cool if someone tested it.  
                - DISK_ONLY is a lot like Hadoop. Streaming in records at a 
time from the disk (its fast).

        3. I had an insane-o bug in our Combiner implementation. < MAX_AMOUNT 
should have been <= MAX_AMOUNT.
                - For g.V().count(), I went from shuffling 500M of data over 
the network to 6.4K.
                - The job sped up by 30 minutes after that fix.
                        - Again, the main reason it was so slow, GC. 500mb 
stream of long payloads reduced to a single machine. Moron.
                - Its a really bad idea to NOT use combiners for both 
VertexProgram and MapReduce. 
                        - This is like the difference between working and not 
working.
                        - Also, this is what scares me about path-based 
traversers (match(), as(), path(), etc.). They can't be bulk'd easily.
                                - We will get smart here though. I have some 
inklings.
        
        4. Its an absolute must that you partition your graph once loaded.
                - Once the graph is loaded (graphDB, HDFS, etc.), the Spark 
partitioner "organizes" the graph around the cluster and then persists it.
                        - For Friendster, this takes about 15 minutes (w/ 
ScriptInputFormat as the read from HDFS parser).
                        - This is important because the loaded graphRDD is 
static and just gets a view propagated through it at each iteration. You don't 
want to keep shuffling this monster on each iteration.
                        - This is also why PersistedXXXRDD is crucial. If you 
are going to keep running jobs on the same data, the RDD being reused is 
already partitioned for you! (tada)
                                - For graph system providers, if you provide an 
InputRDD and you have a partitioner for it, that is a huge savings for 
SparkGraphComputer. So smart to do so.
                - By partitioning upfront, I was able to reduce the shuffle 
load from ~22GB to ~2GB per vertex program iteration on Friendster. Insane.
                        - I was a fool before. I now know how to read Spark 
logs :) which is probably a good thing for me to know.
                - This is so important that we now just do it automatically.
                        - However, if the data source and the Spark cluster are 
already "pair partitioned" we don't repartition! (elegant).
                        - 
http://tinkerpop.incubator.apache.org/docs/3.1.1-SNAPSHOT/reference/#sparkgraphcomputer
 (scroll down to the "InputRDD and OutputRDD"-section).
        
        5. The graph data message/view shuffle is a lot of data. Make use of 
lots of TinkerPop workers() to reduce spills to disk.
                - TinkerPop 3.1.0 introduced GraphComputer.workers(). In 
SparkGraphComputer, this is the number of partitions in the RDD.
                - For Friendster, ScriptInputFormat gives me 229 partitions and 
g.V().count() takes 48 minutes.
                        - If I 5x this to 1145 using "workers(1145)", 
g.V().count() takes 25 minutes. 
                        - Thats a 2x speed up but just chopping the data into 
finer slices.
                        - However, for 2290 workers, g.V().count() only gets 
marginally better -- 23 minutes.
                - This is all about not spilling to disk and not getting GC all 
up in it.       
                - Now imagine if the graph provider's InputRDD already has a 
partitioner -- you are looking at ~10 minutes to g.V().count() Friendster (or 
like 1 minute if we don't load edges)!               

        5. I think we need to make Gryo more efficient. I don't think our 
serialization is optimal :/. Data seems over sized for what it is. This is all 
assumptions right now.
                - I also use JavaSerializer for tinkerpop.Payload data and 
given that that is a significant chunk of what is shuffled --- it might be more 
that than Gryo. :|
                        - https://issues.apache.org/jira/browse/TINKERPOP-1110

        6. There is one last area of the implementation that I think could be 
improved. But besides that (and minor "use less objects"-style optimizations), 
I think SparkGraphComputer *is* how it should be.
                - https://issues.apache.org/jira/browse/TINKERPOP-1108
                - If you are a Spark expert, please do review the code and 
provide feedback. Its really not that much code.
                        - 
https://github.com/apache/incubator-tinkerpop/blob/09a5d288c4143f2853386ce908c82d9ced3c30e7/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
                        - 
https://github.com/apache/incubator-tinkerpop/blob/09a5d288c4143f2853386ce908c82d9ced3c30e7/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java

Anywho…. thats that. TinkerPop 3.1.1 will be sweeeeet.

Enjoy!,
Marko.

http://markorodriguez.com

Reply via email to