We now have PartitionInputFormat which allows an Hadoop-based GraphComputer to pull data from any Graph implementation. PartitionInputSplit is basically a wrapper around Partition so that data access is data-local and thus, for distributed Graph databases, it will simply Partition.vertices() and turn them into VertxWritables. I have a test case that have SparkGraphComputer working over TinkerGraph. Pretty neato. Still lots more cleaning and work to do, but this is a good breather point.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/ad537ce7 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/ad537ce7 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/ad537ce7 Branch: refs/heads/TINKERPOP-1564 Commit: ad537ce73445812f22dc4a117144eeca892df3b6 Parents: 6353595 Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Fri Dec 16 09:03:03 2016 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Mon Jan 23 14:22:53 2017 -0700 ---------------------------------------------------------------------- CHANGELOG.asciidoc | 3 +- .../gremlin/akka/process/actor/MasterActor.java | 2 +- .../gremlin/akka/process/actor/WorkerActor.java | 2 +- .../traversal/step/map/VertexProgramStep.java | 8 +- .../computer/util/GraphComputerHelper.java | 23 +++ .../tinkerpop/gremlin/structure/Partition.java | 2 +- .../gremlin/structure/Partitioner.java | 8 + .../gremlin/structure/util/StringFactory.java | 2 +- .../util/partitioner/GlobalPartitioner.java | 12 +- .../util/partitioner/HashPartitioner.java | 12 +- .../io/partitioner/PartitionerInputFormat.java | 61 ++++++++ .../io/partitioner/PartitionerInputSplit.java | 79 ++++++++++ .../io/partitioner/PartitionerRecordReader.java | 72 +++++++++ .../process/computer/SparkGraphComputer.java | 6 +- ...parkGraphPartitionerComputerProcessTest.java | 33 ++++ .../TinkerGraphPartitionerProvider.java | 155 +++++++++++++++++++ .../process/computer/TinkerGraphComputer.java | 14 +- 17 files changed, 467 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ad537ce7/CHANGELOG.asciidoc ---------------------------------------------------------------------- diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 895fe85..6eae742 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -37,7 +37,8 @@ TinkerPop 3.3.0 (Release Date: NOT OFFICIALLY RELEASED YET) * Added `ProcessTraversalStrategy` which is used to get cached strategies associated with a `Processor`. * Deprecated `Computer` in favor of `GraphComputer.open()`. * Deprecated `Graph.compute()` and `GraphComputer.submit()` in favor of `GraphComputer.submit(Graph)`. ->>>>>>> So much. withProcessor(Processor). No more Compute. Process.submit(Graph) as we are now staging it so that every Processor (GraphComputer/GraphAgents) can work over any Graph. This will all be via Partitioner. withComputer() deprecated. Lots of cool stuff with Process strategies -- ProcessorTraveralStrategy like VertexProgramStratgegy and ActorProgramStrategy work directly with TraversalStrategies.GlobalCache. So much other stuf... I forget. Check the CHANGELOG. This was a massive undertaking but, thank god, its all backwards compatible (though with deprecation). +* Added `PartitionerInputFormat` which allows `SparkGraphComputer` and `GiraphGraphComputer` to pull data from any `Graph` implementation. +* Added `PartitionerInputRDD` which allows `SparkGraphComputer` to pull data from any `Graph` implementation. * Updated Docker build scripts to include Python dependencies (NOTE: users should remove any previously generated TinkerPop Docker images). * Added "attachment requisite" `VertexProperty.element()` and `Property.element()` data in GraphSON serialization. * Added `Vertex`, `Edge`, `VertexProperty`, and `Property` serializers to Gremlin-Python and exposed tests that use graph object arguments. http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ad537ce7/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java index a4ef639..aa31c28 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java @@ -61,7 +61,7 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ this.workers = new ArrayList<>(); final List<Partition> partitions = partitioner.getPartitions(); for (final Partition partition : partitions) { - final String workerPathString = "worker-" + partition.guid(); + final String workerPathString = "worker-" + partition.id(); this.workers.add(new Address.Worker(workerPathString, partition.location())); context().actorOf(Props.create(WorkerActor.class, program, this.master, partition, partitioner), workerPathString); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ad537ce7/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java index 35b5a4f..27f942a 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java @@ -106,7 +106,7 @@ public final class WorkerActor extends AbstractActor implements RequiresMessageQ } private String createWorkerAddress(final Partition partition) { - return "../worker-" + partition.guid(); + return "../worker-" + partition.id(); } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ad537ce7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java index d005940..b02a725 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java @@ -64,7 +64,9 @@ public abstract class VertexProgramStep extends AbstractStep<ComputerResult, Com if (this.first && this.getPreviousStep() instanceof EmptyStep) { this.first = false; final Graph graph = this.getTraversal().getGraph().get(); - future = this.getComputer().apply(graph).program(this.generateProgram(graph, EmptyMemory.instance())).submit(); + future = (this.getComputer().getGraphComputerClass().equals(GraphComputer.class)) ? + this.getComputer().apply(graph).program(this.generateProgram(graph, EmptyMemory.instance())).submit() : + GraphComputer.open(this.getComputer().configuration()).program(this.generateProgram(graph, EmptyMemory.instance())).submit(graph); final ComputerResult result = future.get(); this.processMemorySideEffects(result.memory()); return this.getTraversal().getTraverserGenerator().generate(result, this, 1l); @@ -72,7 +74,9 @@ public abstract class VertexProgramStep extends AbstractStep<ComputerResult, Com final Traverser.Admin<ComputerResult> traverser = this.starts.next(); final Graph graph = traverser.get().graph(); final Memory memory = traverser.get().memory(); - future = this.generateComputer(graph).program(this.generateProgram(graph, memory)).submit(); + future = (this.getComputer().getGraphComputerClass().equals(GraphComputer.class)) ? + this.getComputer().apply(graph).program(this.generateProgram(graph, memory)).submit() : + GraphComputer.open(this.getComputer().configuration()).program(this.generateProgram(graph, memory)).submit(graph); final ComputerResult result = future.get(); this.processMemorySideEffects(result.memory()); return traverser.split(result, this); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ad537ce7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/GraphComputerHelper.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/GraphComputerHelper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/GraphComputerHelper.java index dce1934..840c5da 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/GraphComputerHelper.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/GraphComputerHelper.java @@ -18,13 +18,16 @@ */ package org.apache.tinkerpop.gremlin.process.computer.util; +import org.apache.commons.configuration.Configuration; import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; import org.apache.tinkerpop.gremlin.process.computer.MapReduce; import org.apache.tinkerpop.gremlin.process.computer.Memory; import org.apache.tinkerpop.gremlin.process.computer.VertexProgram; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.structure.Graph; import java.lang.reflect.Method; +import java.util.Iterator; import java.util.Optional; /** @@ -68,6 +71,26 @@ public final class GraphComputerHelper { return persist.isPresent() ? persist.get() : vertexProgram.isPresent() ? vertexProgram.get().getPreferredPersist() : GraphComputer.Persist.NOTHING; } + public static GraphComputer configure(GraphComputer computer, final Configuration configuration) { + final Iterator<String> keys = configuration.getKeys(); + while (keys.hasNext()) { + final String key = keys.next(); + if (key.equals(GraphComputer.WORKERS)) + computer = computer.workers(configuration.getInt(GraphComputer.WORKERS)); + else if (key.equals(GraphComputer.RESULT)) + computer = computer.result(GraphComputer.ResultGraph.valueOf(configuration.getString(GraphComputer.RESULT))); + else if (key.equals(GraphComputer.PERSIST)) + computer = computer.persist(GraphComputer.Persist.valueOf(configuration.getString(GraphComputer.PERSIST))); + else if (key.equals(GraphComputer.VERTICES)) + computer = computer.vertices((Traversal.Admin) configuration.getProperty(GraphComputer.VERTICES)); + else if (key.equals(GraphComputer.EDGES)) + computer = computer.edges((Traversal.Admin) configuration.getProperty(GraphComputer.EDGES)); + else if (!key.equals(GraphComputer.GRAPH_COMPUTER)) + computer = computer.configure(key, configuration.getProperty(key)); + } + return computer; + } + public static boolean areEqual(final MapReduce a, final Object b) { if (null == a) throw Graph.Exceptions.argumentCanNotBeNull("a"); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ad537ce7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java index dbb5260..f20b9fb 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java @@ -65,7 +65,7 @@ public interface Partition { * * @return the unique id of the partition */ - public UUID guid(); + public String id(); /** * Get the {@link InetAddress} of the locations physical location. http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ad537ce7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partitioner.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partitioner.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partitioner.java index 1d4aae1..2e2cdb7 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partitioner.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partitioner.java @@ -30,4 +30,12 @@ public interface Partitioner { public Partition getPartition(final Element element); + public default Partition getPartition(final String id) { + for(final Partition partition : this.getPartitions()) { + if(partition.id().equals(id)) + return partition; + } + throw new IllegalArgumentException("The provided partition does not exist in the partitioner"); + } + } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ad537ce7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java index 61d9551..f7c350d 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java @@ -150,7 +150,7 @@ public final class StringFactory { } public static String partitionString(final Partition partition) { - return "partition" + L_BRACKET + partition.location().getHostAddress() + COLON + partition.guid() + R_BRACKET; + return "partition" + L_BRACKET + partition.location().getHostAddress() + COLON + partition.id() + R_BRACKET; } public static String traversalSourceString(final TraversalSource traversalSource) { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ad537ce7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java index 361750b..1d72a2d 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java @@ -32,7 +32,6 @@ import java.net.UnknownHostException; import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.UUID; /** * @author Marko A. Rodriguez (http://markorodriguez.com) @@ -63,11 +62,12 @@ public final class GlobalPartitioner implements Partitioner { private class GlobalPartition implements Partition { private final Graph graph; - private final UUID guid = UUID.randomUUID(); + private final String id; private final InetAddress location; private GlobalPartition(final Graph graph) { this.graph = graph; + this.id = this.graph.toString(); try { this.location = InetAddress.getLocalHost(); } catch (final UnknownHostException e) { @@ -97,17 +97,17 @@ public final class GlobalPartitioner implements Partitioner { @Override public boolean equals(final Object other) { - return other instanceof Partition && ((Partition) other).guid().equals(this.guid); + return other instanceof Partition && ((Partition) other).id().equals(this.id); } @Override public int hashCode() { - return this.guid.hashCode() + this.location.hashCode(); + return this.id.hashCode() + this.location.hashCode(); } @Override - public UUID guid() { - return this.guid; + public String id() { + return this.id; } @Override http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ad537ce7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/HashPartitioner.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/HashPartitioner.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/HashPartitioner.java index b3d3db7..15b4563 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/HashPartitioner.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/HashPartitioner.java @@ -31,7 +31,6 @@ import java.net.InetAddress; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.UUID; /** * @author Marko A. Rodriguez (http://markorodriguez.com) @@ -72,12 +71,13 @@ public final class HashPartitioner implements Partitioner { private final Partition basePartition; private final int totalSplits; private final int splitId; - private final UUID guid = UUID.randomUUID(); + private final String id; private HashPartition(final Partition basePartition, final int splitId, final int totalSplits) { this.basePartition = basePartition; this.totalSplits = totalSplits; this.splitId = splitId; + this.id = this.basePartition.id() + "#" + splitId; } @Override @@ -102,17 +102,17 @@ public final class HashPartitioner implements Partitioner { @Override public boolean equals(final Object other) { - return other instanceof Partition && ((Partition) other).guid().equals(this.guid); + return other instanceof Partition && ((Partition) other).id().equals(this.id); } @Override public int hashCode() { - return this.guid.hashCode() + this.location().hashCode(); + return this.id.hashCode() + this.location().hashCode(); } @Override - public UUID guid() { - return this.guid; + public String id() { + return this.id; } @Override http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ad537ce7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputFormat.java new file mode 100644 index 0000000..2dae040 --- /dev/null +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputFormat.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.hadoop.structure.io.partitioner; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; +import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.Partition; +import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class PartitionerInputFormat extends InputFormat<NullWritable, VertexWritable> { + @Override + public List<InputSplit> getSplits(final JobContext jobContext) throws IOException, InterruptedException { + final Graph graph = GraphFactory.open(ConfUtil.makeApacheConfiguration(jobContext.getConfiguration())); + final List<Partition> partitions = graph.partitioner().getPartitions(); + final List<InputSplit> inputSplits = new ArrayList<>(partitions.size()); + for (final Partition partition : partitions) { + inputSplits.add(new PartitionerInputSplit(partition)); + } + return inputSplits; + } + + @Override + public RecordReader<NullWritable, VertexWritable> createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + final PartitionerRecordReader reader = new PartitionerRecordReader(); + reader.initialize(inputSplit, taskAttemptContext); + return reader; + } + + +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ad537ce7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputSplit.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputSplit.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputSplit.java new file mode 100644 index 0000000..923f8b3 --- /dev/null +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputSplit.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.hadoop.structure.io.partitioner; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.tinkerpop.gremlin.structure.Partition; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class PartitionerInputSplit extends InputSplit implements Writable { + + private String location; + private String id; + + public PartitionerInputSplit() { + // necessary for serialization/writable + } + + public PartitionerInputSplit(final Partition partition) { + this.location = partition.location().getHostAddress(); + this.id = partition.id(); + } + + @Override + public long getLength() throws IOException, InterruptedException { + return 1; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return new String[]{this.location}; + } + + public String getPartitionId() { + return this.id; + } + + @Override + public void write(final DataOutput dataOutput) throws IOException { + WritableUtils.writeString(dataOutput, this.location); + WritableUtils.writeString(dataOutput, this.id); + } + + @Override + public void readFields(final DataInput dataInput) throws IOException { + this.location = WritableUtils.readString(dataInput); + this.id = WritableUtils.readString(dataInput); + } + + + @Override + public String toString() { + return this.location + "::" + this.id; + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ad537ce7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerRecordReader.java new file mode 100644 index 0000000..c6de8ab --- /dev/null +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerRecordReader.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.hadoop.structure.io.partitioner; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; +import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; + +import java.io.IOException; +import java.util.Iterator; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class PartitionerRecordReader extends RecordReader<NullWritable, VertexWritable> { + + private Iterator<Vertex> vertices; + + @Override + public void initialize(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + final Graph graph = GraphFactory.open(ConfUtil.makeApacheConfiguration(taskAttemptContext.getConfiguration())); + this.vertices = graph.partitioner().getPartition(((PartitionerInputSplit) inputSplit).getPartitionId()).vertices(); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + return this.vertices.hasNext(); + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException { + return NullWritable.get(); + } + + @Override + public VertexWritable getCurrentValue() throws IOException, InterruptedException { + return new VertexWritable(this.vertices.next()); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return 0; + } + + @Override + public void close() throws IOException { + + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ad537ce7/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java index 61c9663..cbcdfe7 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java @@ -53,6 +53,7 @@ import org.apache.tinkerpop.gremlin.process.computer.MapReduce; import org.apache.tinkerpop.gremlin.process.computer.Memory; import org.apache.tinkerpop.gremlin.process.computer.VertexProgram; import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult; +import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper; import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory; import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies; import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException; @@ -143,8 +144,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { @Override public Future<ComputerResult> submit(final Graph graph) { - this.hadoopGraph = (HadoopGraph) graph; - ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration); + ConfigurationUtils.copy(graph.configuration(), this.sparkConfiguration); return this.submit(); } @@ -154,7 +154,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { } public static SparkGraphComputer open(final org.apache.commons.configuration.Configuration configuration) { - return HadoopGraph.open(configuration).compute(SparkGraphComputer.class); + return new SparkGraphComputer(HadoopGraph.open(configuration)); } private Future<ComputerResult> submitWithExecutor(Executor exec) { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ad537ce7/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/SparkGraphPartitionerComputerProcessTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/SparkGraphPartitionerComputerProcessTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/SparkGraphPartitionerComputerProcessTest.java new file mode 100644 index 0000000..7c5b3e3 --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/SparkGraphPartitionerComputerProcessTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark.structure.io.partitioner; + +import org.apache.tinkerpop.gremlin.GraphProviderClass; +import org.apache.tinkerpop.gremlin.process.ProcessComputerSuite; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; +import org.junit.runner.RunWith; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +@RunWith(ProcessComputerSuite.class) +@GraphProviderClass(provider = TinkerGraphPartitionerProvider.class, graph = TinkerGraph.class) +public class SparkGraphPartitionerComputerProcessTest { +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ad537ce7/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/TinkerGraphPartitionerProvider.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/TinkerGraphPartitionerProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/TinkerGraphPartitionerProvider.java new file mode 100644 index 0000000..941c942 --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/TinkerGraphPartitionerProvider.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark.structure.io.partitioner; + +import org.apache.commons.configuration.Configuration; +import org.apache.tinkerpop.gremlin.AbstractGraphProvider; +import org.apache.tinkerpop.gremlin.AbstractGremlinTest; +import org.apache.tinkerpop.gremlin.GraphProvider; +import org.apache.tinkerpop.gremlin.LoadGraphWith; +import org.apache.tinkerpop.gremlin.hadoop.Constants; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.partitioner.PartitionerInputFormat; +import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer; +import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.VertexProperty; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerEdge; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerElement; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraphVariables; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerProperty; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertex; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertexProperty; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +@GraphProvider.Descriptor(computer = SparkGraphComputer.class) +public class TinkerGraphPartitionerProvider extends AbstractGraphProvider { + + private static Set<String> SKIP_TESTS = new HashSet<>(Arrays.asList( + "testProfileStrategyCallback", + "testProfileStrategyCallbackSideEffect", + "shouldSucceedWithProperTraverserRequirements", + "shouldFailWithImproperTraverserRequirements")); + + private static final Set<Class> IMPLEMENTATION = new HashSet<Class>() {{ + add(TinkerEdge.class); + add(TinkerElement.class); + add(TinkerGraph.class); + add(TinkerGraphVariables.class); + add(TinkerProperty.class); + add(TinkerVertex.class); + add(TinkerVertexProperty.class); + }}; + + @Override + public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, + final LoadGraphWith.GraphData loadGraphWith) { + + final TinkerGraph.DefaultIdManager idManager = selectIdMakerFromGraphData(loadGraphWith); + final String idMaker = (idManager.equals(TinkerGraph.DefaultIdManager.ANY) ? selectIdMakerFromGraphData(loadGraphWith) : idManager).name(); + final boolean skipTest = SKIP_TESTS.contains(testMethodName) || SKIP_TESTS.contains(test.getCanonicalName()); + return new HashMap<String, Object>() {{ + put(Graph.GRAPH, TinkerGraph.class.getName()); + put(TinkerGraph.GREMLIN_TINKERGRAPH_VERTEX_ID_MANAGER, idMaker); + put(TinkerGraph.GREMLIN_TINKERGRAPH_EDGE_ID_MANAGER, idMaker); + put(TinkerGraph.GREMLIN_TINKERGRAPH_VERTEX_PROPERTY_ID_MANAGER, idMaker); + put("skipTest", skipTest); + if (loadGraphWith == LoadGraphWith.GraphData.CREW) + put(TinkerGraph.GREMLIN_TINKERGRAPH_DEFAULT_VERTEX_PROPERTY_CARDINALITY, VertexProperty.Cardinality.list.name()); + put("spark.master", "local[4]"); + put("spark.serializer", GryoSerializer.class.getCanonicalName()); + put("spark.kryo.registrationRequired", true); + put(Constants.GREMLIN_HADOOP_GRAPH_READER, PartitionerInputFormat.class.getCanonicalName()); + put(Constants.GREMLIN_HADOOP_GRAPH_WRITER, GryoOutputFormat.class.getCanonicalName()); + put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, getWorkingDirectory()); + put(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false); + put(GraphComputer.GRAPH_COMPUTER, SparkGraphComputer.class.getCanonicalName()); + put(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); + put(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo"); + // System.out.println(AbstractGremlinTest.class.getResource(loadGraphWith.location()).toString().replace("file:","")); + put(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, AbstractGremlinTest.class.getResource(loadGraphWith.location()).toString().replace("file:", "")); + }}; + + + } + + protected void readIntoGraph(final Graph g, final String path) throws IOException { + + } + + @Override + public void clear(final Graph graph, final Configuration configuration) throws Exception { + //if (graph != null) + // graph.close(); + } + + @Override + public Set<Class> getImplementations() { + return IMPLEMENTATION; + } + + /** + * Test that load with specific graph data can be configured with a specific id manager as the data type to + * be used in the test for that graph is known. + */ + protected TinkerGraph.DefaultIdManager selectIdMakerFromGraphData(final LoadGraphWith.GraphData loadGraphWith) { + if (null == loadGraphWith) return TinkerGraph.DefaultIdManager.ANY; + if (loadGraphWith.equals(LoadGraphWith.GraphData.CLASSIC)) + return TinkerGraph.DefaultIdManager.INTEGER; + else if (loadGraphWith.equals(LoadGraphWith.GraphData.MODERN)) + return TinkerGraph.DefaultIdManager.INTEGER; + else if (loadGraphWith.equals(LoadGraphWith.GraphData.CREW)) + return TinkerGraph.DefaultIdManager.INTEGER; + else if (loadGraphWith.equals(LoadGraphWith.GraphData.GRATEFUL)) + return TinkerGraph.DefaultIdManager.INTEGER; + else + throw new IllegalStateException(String.format("Need to define a new %s for %s", TinkerGraph.IdManager.class.getName(), loadGraphWith.name())); + } + + ///////////////////////////// + ///////////////////////////// + ///////////////////////////// + + @Override + public GraphTraversalSource traversal(final Graph graph) { + if ((Boolean) graph.configuration().getProperty("skipTest")) + return graph.traversal().withComputer(); + else { + return graph.traversal().withProcessor(SparkGraphComputer.open(graph.configuration())); + } + } + + @Override + public GraphComputer getGraphComputer(final Graph graph) { + return SparkGraphComputer.open(graph.configuration()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ad537ce7/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java index 82cb934..cbbe53f 100644 --- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java +++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java @@ -69,7 +69,7 @@ public final class TinkerGraphComputer implements GraphComputer { private Persist persist = null; private VertexProgram<?> vertexProgram; - private final TinkerGraph graph; + private TinkerGraph graph; private TinkerMemory memory; private final TinkerMessageBoard messageBoard = new TinkerMessageBoard(); private boolean executed = false; @@ -97,6 +97,7 @@ public final class TinkerGraphComputer implements GraphComputer { this.graph = null; this.configuration = configuration; this.configuration.setProperty(GRAPH_COMPUTER, TinkerGraphComputer.class.getCanonicalName()); + GraphComputerHelper.configure(this, ConfigurationUtils.cloneConfiguration(configuration)); } public static TinkerGraphComputer open(final Configuration configuration) { @@ -104,10 +105,7 @@ public final class TinkerGraphComputer implements GraphComputer { } public static TinkerGraphComputer open() { - final BaseConfiguration configuration = new BaseConfiguration(); - configuration.setDelimiterParsingDisabled(true); - configuration.setProperty(GRAPH_COMPUTER, TinkerGraphComputer.class.getCanonicalName()); - return new TinkerGraphComputer(configuration); + return new TinkerGraphComputer(new BaseConfiguration()); } @Override @@ -164,6 +162,12 @@ public final class TinkerGraphComputer implements GraphComputer { } @Override + public Future<ComputerResult> submit(final Graph graph) { + this.graph = (TinkerGraph) graph; + return this.submit(); + } + + @Override public Future<ComputerResult> submit() { // a graph computer can only be executed once if (this.executed)