GitHub user okram opened a pull request:
https://github.com/apache/tinkerpop/pull/506
TINKERPOP-1389: Support Spark 2.0.0
https://issues.apache.org/jira/browse/TINKERPOP-1389
This was a crazy rollercoaster ride, but we now have three good things:
1. A more efficient serialization model in `SparkGraphComputer` using
`KryoSerializer` (Spark) and `GryoRegistrator` (TinkerPop). This always
existed, but was not default and had some issues that needed to be resolved.
2. Support for Spark 2.0.0. Spark 1.x has a different API and
`<depedencies/>` so this was pretty significant.
3. A test infrastructure in hadoop-gremlin that verifies that `IoRegistry`
works in both Giraph and Spark. Prior to this moment, custom serializers
weren't tested, nor did they work correctly!
This work is primarily the efforts of @yucx (Spark 2.0 compliance) and
@dalaro (serialization infrastructure). I simply mediated their respective
contributions and cleaned/organized/tested things accordingly.
Here are the benchmarks against Friendster (2.5 billion edges). Note that
the TinkerPop 3.3.x line has two times. The first is with `GryoSerializer` (the
old model) and the second is with `GryoRegistrator` (the new model).
```
g.V().count() -- answer 125000000 (125 million vertices)
- TinkerPop 3.0.0.MX: 2.5 hours
- TinkerPop 3.0.0: 1.5 hours
- TinkerPop 3.1.1: 23 minutes
- TinkerPop 3.2.0: 6.8 minutes (Spark 1.5.2)
- TinkerPop 3.2.0: 5.5 minutes (Spark 1.6.1)
- TinkerPop 3.2.1: 2.2 minutes (Spark 1.6.1)
- TinkerPop 3.3.x: 1.7/1.9 minutes (Spark 2.2.0)
g.V().out().count() -- answer 2586147869 (2.5 billion length-1 paths (i.e.
edges))
- TinkerPop 3.0.0.MX: unknown
- TinkerPop 3.0.0: 2.5 hours
- TinkerPop 3.1.1: 1.1 hours
- TinkerPop 3.2.0: 13 minutes (Spark 1.5.2)
- TinkerPop 3.2.0: 12 minutes (Spark 1.6.1)
- TinkerPop 3.2.1: 2.4 minutes (Spark 1.6.1)
- TinkerPop 3.3.x: 2.2/2.2 minutes (Spark 2.0.0)
g.V().out().out().count() -- answer 640528666156 (640 billion length-2
paths)
- TinkerPop 3.2.0: 55 minutes (Spark 1.5.2)
- TinkerPop 3.2.0: 50 minutes (Spark 1.6.1)
- TinkerPop 3.2.1: 37 minutes (Spark 1.6.1)
- TinkerPop 3.3.x: 42/27 minutes (Spark 2.0.0)
g.V().out().out().out().count() -- answer 215664338057221 (215 trillion
length 3-paths)
- TinkerPop 3.0.0.MX: 12.8 hours
- TinkerPop 3.0.0: 8.6 hours
- TinkerPop 3.1.1: 2.4 hours
- TinkerPop 3.2.0: 1.6 hours (Spark 1.5.2)
- TinkerPop 3.2.0: 1.5 hours (Spark 1.6.1)
- TinkerPop 3.2.1: 1.1 hours (Spark 1.6.1)
- TinkerPop 3.3.x: 1.4/0.8 hours (Spark 2.0.0)
g.V().out().out().out().out().count() -- answer 83841426570464575 (83
quadrillion length 4-paths)
- TinkerPop 3.2.0: 2.1 hours (Spark 1.6.1)
- TinkerPop 3.2.1: 1.7 hours (Spark 1.6.1)
- TinkerPop 3.3.x: 2.0/1.2 hours (Spark 2.0.0)
g.V().out().out().out().out().out().count() -- answer -2280190503167902456
!! I blew the long space -- 64-bit overflow.
- TinkerPop 3.2.0: 2.8 hours (Spark 1.6.1)
- TinkerPop 3.2.1: 2.2 hours (Spark 1.6.1)
- TinkerPop 3.3.x: 2.6/1.6 hours (Spark 2.0.0)
g.V().group().by(outE().count()).by(count()).
- TinkerPop 3.2.0: 12 minutes (Spark 1.6.1)
- TinkerPop 3.2.1: 2.4 minutes (Spark 1.6.1)
- TinkerPop 3.3.x: 3.2/3.2 minutes (Spark 2.0.0)
g.V().groupCount().by(outE().count())
- TinkerPop 3.2.0: 12 minutes (Spark 1.6.1)
- TinkerPop 3.2.1: 2.7 minutes (Spark 1.6.1)
- TinkerPop 3.3.x: 2.2/2.2 minutes (Spark 2.0.0)
```
VOTE +1.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/apache/tinkerpop TINKERPOP-1389
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/tinkerpop/pull/506.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #506
----
commit f0c5a5f133635215b96ba3f2b7431706e23938b3
Author: yucx <[email protected]>
Date: 2016-09-02T06:11:58Z
TINKERPOP-1389 Support Spark 2.0
commit 2321117c1fb9f5927569d9d61fa28250916b4807
Author: Marko A. Rodriguez <[email protected]>
Date: 2016-09-12T18:22:05Z
GryoSerializer uses HadoopPools so that gryo pools are not constantly
produced (object reused stylie). This have increased the performance of
GryoSerializer based jobs to that of the 3.2.x line prior to bumping to Spark
2.0.
commit 0b7d71b17b00355567f18f289fcefd5c6e09d1ea
Author: Marko A. Rodriguez <[email protected]>
Date: 2016-09-13T00:04:30Z
jesus. that was a depedency hell like only spark can provide. got
SparkServer working with GremlinConsole and HDFS. Phew... Also, bumped to
Hadoop 2.7.3 (from 2.7.2).
commit ef18e4ecaebf5132d37b4e52e6fa7112b79b5a26
Author: Marko A. Rodriguez <[email protected]>
Date: 2016-09-13T14:20:22Z
had to go back down to Hadoop 2.7.2 because Hadoop 2.7.3 changed a method
signature that GiraphJob calls and thus, Giraph fails. Dar. Cleaned up pom.xmls
a bit and updated CHANGELOG. This branch is ready to be tested on the Blade
cluster against the Friendster dataset.
commit cafeefce8a112445cda6930ba3c51eb9a4dc5ef2
Author: Marko A. Rodriguez <[email protected]>
Date: 2016-10-24T19:31:58Z
@dalaro provided an update to KryoShimServiceLoader via a gist as the LOG
message was not so clear.
commit 4489247fc191bfb014d6ed4e581e0acbbf407cf4
Author: Marko A. Rodriguez <[email protected]>
Date: 2016-10-24T21:21:23Z
if the System.getProperty() has a shim-service set, then it is also set as
a DRIVER_JAVA_OPTIONS and an EXECUTOR_JAVA_OPTIONS in SparkGraphComputer (this
is necessary for Spark 2.0). Also, some is.testing class registrations are now
also needed outside of testing -- Spark 2.0 anonomaly.
commit 88a7c45eada0254d2163f0fee2608ce82b14e584
Author: Marko A. Rodriguez <[email protected]>
Date: 2016-10-25T14:02:07Z
UnshadedKryoShimService is now the default for SparkGraphComputer. Users no
longer have to System.setProperty() as its all handled in the
SparkGraphComputer constructor. I would really like to get rid of the 'service
model' (cc @dalaro) as UnshadedKryoShimService only works with Spark and thus,
we can't have it leak over to Giraph (and other Hadoop-based graph
computeters). Its leak-free right now, but in general, it would be good to not
even have this as a priority/service thing. Trying to think how to do this....
commit 39198753ba6993164ad605c175f03ed14338a627
Author: Marko A. Rodriguez <[email protected]>
Date: 2016-10-25T16:33:18Z
minor nothing to GiraphGraphComputer.
commit f2606364af3a1bf3b9815b2499a1181082ea414f
Author: Dan LaRocque <[email protected]>
Date: 2016-10-26T00:37:17Z
Avoid starting VP worker iterations that never end
SparkExecutor.executeVertexProgramIteration was written in such a way
that an empty RDD partition would cause it to invoke
VertexProgram.workerIterationStart without ever invoking
VertexProgram.workerIterationEnd. This seems like a contract
violation. I have at least one VP that relies on
workerIterationStart|End to allocate and release resources. Failing
to invoke End like this causes a leak in that VP, as it would for any
VP that uses that resource management pattern.
(cherry picked from commit 36e1159a80f539b8bd4a884e5c1cf304ec52c4f9;
this is the same change, except it tracks a switch between Spark 1.6
and 2.0 away from functions that manipulate iterables to those that
manipulate iterators)
commit 46d402fb4e3e7ec43f8796578358a24680bd0afa
Author: Marko A. Rodriguez <[email protected]>
Date: 2016-10-26T01:44:45Z
Decoupled GryoRecordReader/Writer from KryoShimService. The shim service
should ONLY be for inter-process communication -- not input/output formats. cc/
@dalaro
commit 4fcb2db39e823cedd8faa6708426c9f554a01f34
Author: Marko A. Rodriguez <[email protected]>
Date: 2016-10-26T13:18:21Z
added SystemUtil to gremlin-core which is able to generate Configurations
from System.properties. Useful in Hadoop GraphComputer settings where JVMs can
have IO configuration information in them. HadoopPools will now try and
generate a Configuraiton from System.properties if no configuration is
explicitly provided. Likewise for KryoShimServices (via KryoShimServiceLoader).
SparkGraphComputer will spawn its driver and executors using gremlin. and
spark. System properties taken from the SparkConf. Lots of nick nack cleanups
to the shim service scene. cc/ @dalaro.
commit 68ccbb2073e81f7c08a17f3cc191ca90b1d9e89f
Author: Marko A. Rodriguez <[email protected]>
Date: 2016-10-26T14:04:38Z
using tinkerpop as the prefix for System property configurations. Tweaked
the KryoShimServiceLoader's logging information so its easier to see whats
going on --- doing cluster testing.
commit a0fa7c6e5e6c9ee41aedf7e311e9f3f75a6dd8c5
Author: Marko A. Rodriguez <[email protected]>
Date: 2016-10-26T16:13:40Z
Really simplified UnshadedKryoShimService and
IoRegistryAwareKryoSerializer. Also, introduced a synchronization point in
KryoServiceLoader.applyConfiguration() as I believe that multiple threads are
creating a service over and over again. Hopefully this doesn't create a bottle
neck. Going to test on the cluster.
commit 0b799806120ce49122a76be055d6d53e11896cfa
Author: Marko A. Rodriguez <[email protected]>
Date: 2016-10-26T20:21:42Z
lots of good stuff here -- finally have testing of IORegistry in
SparkGraphComputer. Have ToyPoint and TestIoRegistry. Realized a bunch of
stupid .flush() calls in the GryoSerializer serializers (Spark)... perhaps that
is why things are slower than KryoSerializer. Really cleaned up
IoRegistryAwareKryoSerializer. Added getShim() methods to the
Shaded/UnshadedSerializerAdaptors. I'm a stud. cc/ @dalaro
commit a30bdfa1286266c942abebce6c75f16002802320
Author: Marko A. Rodriguez <[email protected]>
Date: 2016-10-26T23:11:54Z
Added AbstractIoRegistryCheck that verifies the IoRegistry of test-types
(ToyPoint and ToyTriangle) work for OLTP, OLAP, and Storage streaming on all
Hadoop-based graphs (Giraph/Spark). Currently, it only checks Gryo regitration,
but is set up to next to GraphSON as well. This gives us confidence that
HadoopPools, GryoRegistrator, etc. models all work as expected regaring IO
registration. Had to tweak GryoMapper such that IoRegistries are added first
before initlizeMapper() runs. This ensures proper ID ordering between the
various serializers. Will need to talk with @spmallette to make sure this is
kosher. If not, I have another way of getting the same behavior. Stuff is
looking REALLY solid. TinkerPop 3.3.0 Spark/Giraph is going to be bumpin'.
commit 07503d6e8aa812eb94c1905b074c32718d4bd347
Author: Marko A. Rodriguez <[email protected]>
Date: 2016-10-27T01:00:18Z
weird necessity. will figure out cause later.
commit b6954f98e71bfe120148b016b7f797e7b15b7e5a
Author: Marko A. Rodriguez <[email protected]>
Date: 2016-10-27T11:27:48Z
KryoShimServices now implement .close() so they can relinquish resources.
KryoShimServiceLoader also implements close() and will close any static shim
service it maintains. On a force reload, any existing KryoShimService is
close()d. This was necessary for the test suite as different IORegitries were
being loaded, dangling KRYO objects didn't have the loaded registries. Reverted
GryoMapper to how it was and now the Spark-specific classes in GryoSerializer
are simply an IoRegistry -- ta da. Going to do GraphSON IoRegistry testing in
Giraph and Spark and then I think we are done with tihs branch. cc/ @dalaro.
commit 04efe0c5cd72310f4c87e094c851b62f50d87e1f
Author: Marko A. Rodriguez <[email protected]>
Date: 2016-10-27T11:28:58Z
dah. sorry. had some work I started doing before the previous commit.
reverting.
commit 6bc8aed9230061e0165a48eb89009fe2c1a53bd1
Author: Marko A. Rodriguez <[email protected]>
Date: 2016-10-27T19:28:07Z
okay. so I found a bug that has to do with joins() in Spark and
IoRegistry.... I thought this whole time it was from me -- but then I realized
that I added a .out() to the test. I reverted to an older version of the branch
and added a .out(). failed. GryoRegistrator is being used as the Kryo source,
not IoAwareKryoSerializer. It happens in the shuffle 'threads' and skipts
anything regarding KryoServiceLoader. I am so brain fried --- hopefully I
realize the solution. For now, everything else is really good.
commit 18eae6d21125376e447628972ca9bd294984d1ef
Author: Marko A. Rodriguez <[email protected]>
Date: 2016-10-27T21:32:04Z
No more IoAwareSerializer -- it doens't work in Spark 2.0's serialization
model. GryoRegistrar is now responsible for handling IoRegistry registration.
Much simpler. Finally, everything is passing -- both GryoSerializer and
Kryo+GryoRegistrar. What a horrendous day. cc/ @dalaro.
commit f7b71b376fef46327b25bc4800860cfb404c8613
Author: Dan LaRocque <[email protected]>
Date: 2016-11-17T16:33:58Z
Limit JVM system props passed around in Spark jobs
Prior to this commit, SGC indiscriminately set the entire job config
as JVM system properties on Spark executors. It didn't account for
the fact that some config values (e.g. spark.job.description) could
have spaces. A value with a space wouldn't get quoted. This led to
Spark workers failing to start, because part of the unquoted value
would be erroneously interpreted as the JVM main class.
This commit makes SGC only pass two config settings as system props:
* gremlin.io.registry
* gremlin.io.kryoShimService
commit 41892072261b8bf3ccdd9d8e3f44c8706dc1474e
Author: Marko A. Rodriguez <[email protected]>
Date: 2016-11-29T12:51:10Z
tweaks to IoRegistryHelper now that instance() is the default instead of
getInstance(). Minor tweaks here and there otheriwse. Rebased with master/.
commit b8a2452deb7f6a23fae29684c1ead2ed35d7cb69
Author: Marko A. Rodriguez <[email protected]>
Date: 2016-11-29T13:00:19Z
fixed a bug in the CHANGELOG and added a few more things.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---