Hello TinkerPop devs, over the last couple of days we've implemented a BulkLoaderVertexProgram for TinkerPop3. I know a lot of people are waiting for it and I guess - once it's released - it won't take very long until Stephen and I continue the Powers of Ten <http://thinkaurelius.com/2014/05/29/powers-of-ten-part-i/> blog post series.
The TinkerPop3 BulkLoaderVertexProgram comes with an IncrementalBulkLoader implementation that is used by default. However, it's easy to use your own customized implementation of a bulk loader. The vertex program supports all the input format you're already familiar with (GraphSON, Kryo, Script). As a target graph you can use any graph that supports multiple concurrent connections (unfortunately that restriction disqualifies Neo4j as its current TP3 implementation does not support the HA mode). Let me walk you through a simple example that loads the Grateful Dead graph into Titan. *Prerequisites* - TinkerPop3 (development branch: blvp) - Titan 0.9 (customized build) - a running Hadoop (pseudo) cluster - Cassandra 2.1.x (for this particular example, as I'm going to use Titan/Cassandra) *Build TinkerPop3 from source* git clone https://github.com/apache/incubator-tinkerpop.git cd incubator-tinkerpop git checkout blvp mvn clean install -DskipTests *Build Titan from source* git clone https://github.com/thinkaurelius/titan.git cd titan sed 's@<tinkerpop.version>.*</tinkerpop.version>@<tinkerpop.version>3.0.1-SNAPSHOT</tinkerpop.version>@' pom.xml > pom.xml.new mv pom.xml.new pom.xml mvn clean install -DskipTests *Copy the Grateful Dead files to HDFS* cd incubator-tinkerpop find . -name script-input-grateful-dead.groovy | head -n1 | xargs -I {} hadoop fs -copyFromLocal {} script-input-grateful-dead.groovy find . -name grateful-dead.txt | head -n1 | xargs -I {} hadoop fs -copyFromLocal {} grateful-dead.txt *Create 2 configuration files - 1 for Titan/Cassandra, one for Hadoop / for the BulkLoader* *titan-cassandra.properties* gremlin.graph=com.thinkaurelius.titan.core.TitanFactory storage.backend=cassandrathrift storage.hostname=127.0.0.1 *hadoop-script.properties* gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.script.ScriptInputFormat gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat gremlin.hadoop.jarsInDistributedCache=true gremlin.hadoop.inputLocation=grateful-dead.txt gremlin.hadoop.scriptInputFormat.script=script-input-grateful-dead.groovy gremlin.hadoop.outputLocation=output # Bulk Loader configuration gremlin.bulkLoaderVertexProgram.loader.class=org.apache.tinkerpop.gremlin.process.computer.bulkloading.IncrementalBulkLoader gremlin.bulkLoaderVertexProgram.loader.vertexIdProperty=bulkloader.vertex.id gremlin.bulkLoaderVertexProgram.loader.userSuppliedIds=false gremlin.bulkLoaderVertexProgram.loader.keepOriginalIds=false gremlin.bulkLoaderVertexProgram.graph.class=com.thinkaurelius.titan.core.TitanFactory gremlin.bulkLoaderVertexProgram.graph.storage.backend=cassandrathrift gremlin.bulkLoaderVertexProgram.graph.storage.hostname=127.0.0.1 gremlin.bulkLoaderVertexProgram.graph.storage.batch-loading=true gremlin.bulkLoaderVertexProgram.intermediateBatchSize=10000 spark.master=local[4] spark.executor.memory=1g spark.serializer=org.apache.spark.serializer.KryoSerializer *Create the Titan schema* cd titan bin/gremlin.sh graph = GraphFactory.open("titan-cassandra.properties") m = graph.openManagement() // vertex labels artist = m.makeVertexLabel("artist").make() song = m.makeVertexLabel("song").make() // edge labels sungBy = m.makeEdgeLabel("sungBy").make() writtenBy = m.makeEdgeLabel("writtenBy").make() followedBy = m.makeEdgeLabel("followedBy").make() // vertex and edge properties blid = m.makePropertyKey("bulkloader.vertex.id ").dataType(Long.class).make() name = m.makePropertyKey("name").dataType(String.class).make() songType = m.makePropertyKey("songType").dataType(String.class).make() performances = m.makePropertyKey("performances").dataType(Integer.class).make() weight = m.makePropertyKey("weight").dataType(Integer.class).make() // global indices m.buildIndex("byBulkLoaderVertexId", Vertex.class).addKey(blid).buildCompositeIndex() m.buildIndex("artistsByName", Vertex.class).addKey(name).indexOnly(artist).buildCompositeIndex() m.buildIndex("songsByName", Vertex.class).addKey(name).indexOnly(song).buildCompositeIndex() // vertex centric indices m.buildEdgeIndex(followedBy, "followedByTime", Direction.BOTH, Order.decr, weight) m.commit() graph.close() Up to this point it's the usual stuff that we do all day long; create configurations, create schemas, mess with Hadoop... All in all nothing special. *Here comes the new part - start the BulkLoadervertexProgram* blgr = GraphFactory.open("hadoop-script.properties") blvp = BulkLoaderVertexProgram.build().create(blgr) blgr.compute(SparkGraphComputer).program(blvp).submit().get() Note that you don't have to have to Bulk Loader configuration embedded in you Hadoop graph configuration file, you can also do: blgr = GraphFactory.open("hadoop-script.properties") blvp = BulkLoaderVertexProgram.build().configure( // default values not included "loader.vertexIdProperty", "bulkloader.vertex.id", "loader.keepOriginalIds", false, "graph.class", "com.thinkaurelius.titan.core.TitanFactory" "graph.storage.backend", "cassandrathrift" "graph.storage.hostname", "127.0.0.1", "graph.storage.batch-loading", true, "intermediateBatchSize", 10000 ).create(blgr) blgr.compute(SparkGraphComputer).program(blvp).submit().get() ...or simply mix both approaches. Play around with it and let us know what you think. If we get enough positive feedback / no negative feedback, the BulkLoaderVertexProgram will make it into the next TinkerPop release (3.0.1) and thus also into the next Titan release. Cheers, Daniel
