Repository: incubator-tinkerpop Updated Branches: refs/heads/TINKERPOP-1310 7255844c0 -> e6f2caa89
lots of more documentation on TraversalVertexProgram and I really combed through the code and was able to find numerous minor optimizations here and there. Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/e6f2caa8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/e6f2caa8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/e6f2caa8 Branch: refs/heads/TINKERPOP-1310 Commit: e6f2caa89adc8e6630dadc613ca9b6b92416c223 Parents: 7255844 Author: Marko A. Rodriguez <[email protected]> Authored: Wed May 25 14:54:15 2016 -0600 Committer: Marko A. Rodriguez <[email protected]> Committed: Wed May 25 14:54:15 2016 -0600 ---------------------------------------------------------------------- .../computer/traversal/MasterExecutor.java | 47 +++++++++----------- .../traversal/TraversalVertexProgram.java | 2 + .../computer/traversal/WorkerExecutor.java | 32 ++++++------- .../gremlin/hadoop/structure/HadoopGraph.java | 2 +- .../HaltedTraverserFactoryStrategyTest.java | 7 ++- 5 files changed, 43 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e6f2caa8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java index b994f1e..1c1e9d2 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java @@ -43,7 +43,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix; import org.apache.tinkerpop.gremlin.structure.util.Attachable; import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory; import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory; -import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import java.util.HashSet; import java.util.Iterator; @@ -70,20 +69,17 @@ final class MasterExecutor { return traverser; } - // handle traversers and data that were sent from the workers to the master traversal via memory - protected static void processMemory(final TraversalMatrix<?, ?> traversalMatrix, final Memory memory, final TraverserSet<Object> traverserSet, final Set<String> completedBarriers) { + protected static void processMemory(final TraversalMatrix<?, ?> traversalMatrix, final Memory memory, final TraverserSet<Object> toProcessTraversers, final Set<String> completedBarriers) { + // handle traversers and data that were sent from the workers to the master traversal via memory if (memory.exists(TraversalVertexProgram.MUTATED_MEMORY_KEYS)) { for (final String key : memory.<Set<String>>get(TraversalVertexProgram.MUTATED_MEMORY_KEYS)) { final Step<Object, Object> step = traversalMatrix.getStepById(key); - if (null == step) continue; // why? how can this happen? assert step instanceof Barrier; completedBarriers.add(step.getId()); if (!(step instanceof LocalBarrier)) { // local barriers don't do any processing on the master traversal (they just lock on the workers) final Barrier<Object> barrier = (Barrier<Object>) step; barrier.addBarrier(memory.get(key)); - while (step.hasNext()) { - traverserSet.add(step.next()); - } + step.forEachRemaining(toProcessTraversers::add); // if it was a reducing barrier step, reset the barrier to its seed value if (step instanceof ReducingBarrierStep) memory.set(step.getId(), ((ReducingBarrierStep) step).getSeedSupplier().get()); @@ -100,34 +96,33 @@ final class MasterExecutor { final TraverserSet<Object> haltedTraversers, final Class haltedTraverserFactory) { - while (!toProcessTraversers.isEmpty()) { final TraverserSet<Object> localActiveTraversers = new TraverserSet<>(); Step<Object, Object> previousStep = EmptyStep.instance(); Step<Object, Object> currentStep = EmptyStep.instance(); - final Iterator<Traverser.Admin<Object>> traversers = IteratorUtils.removeOnNext(toProcessTraversers.iterator()); + // these are traversers that are at the master traversal and will either halt here or be distributed back to the workers as needed + final Iterator<Traverser.Admin<Object>> traversers = toProcessTraversers.iterator(); while (traversers.hasNext()) { final Traverser.Admin<Object> traverser = traversers.next(); - traverser.set(DetachedFactory.detach(traverser.get(), true)); + traversers.remove(); + traverser.set(DetachedFactory.detach(traverser.get(), true)); // why? traverser.setSideEffects(traversal.get().getSideEffects()); - if (traverser.isHalted()) { + if (traverser.isHalted()) haltedTraversers.add(MasterExecutor.detach(traverser, haltedTraverserFactory)); - } else if (isRemoteTraverser(traverser, traversalMatrix)) { // this is so that patterns like order().name work as expected. + else if (isRemoteTraverser(traverser, traversalMatrix)) // this is so that patterns like order().name work as expected. try and stay local as long as possible remoteActiveTraversers.add(traverser.detach()); - } else { + else { currentStep = traversalMatrix.getStepById(traverser.getStepId()); if (!currentStep.getId().equals(previousStep.getId()) && !(previousStep instanceof EmptyStep)) { while (previousStep.hasNext()) { final Traverser.Admin<Object> result = previousStep.next(); - if (result.isHalted()) { + if (result.isHalted()) haltedTraversers.add(MasterExecutor.detach(result, haltedTraverserFactory)); - } else { - if (isRemoteTraverser(result, traversalMatrix)) { - remoteActiveTraversers.add(result.detach()); - } else - localActiveTraversers.add(result); - } + else if (isRemoteTraverser(result, traversalMatrix)) + remoteActiveTraversers.add(result.detach()); + else + localActiveTraversers.add(result); } } currentStep.addStart(traverser); @@ -137,14 +132,12 @@ final class MasterExecutor { if (!(currentStep instanceof EmptyStep)) { while (currentStep.hasNext()) { final Traverser.Admin<Object> traverser = currentStep.next(); - if (traverser.isHalted()) { + if (traverser.isHalted()) haltedTraversers.add(MasterExecutor.detach(traverser, haltedTraverserFactory)); - } else { - if (isRemoteTraverser(traverser, traversalMatrix)) { - remoteActiveTraversers.add(traverser.detach()); - } else - localActiveTraversers.add(traverser); - } + else if (isRemoteTraverser(traverser, traversalMatrix)) + remoteActiveTraversers.add(traverser.detach()); + else + localActiveTraversers.add(traverser); } } assert toProcessTraversers.isEmpty(); http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e6f2caa8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java index d4daaac..4479306 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java @@ -297,7 +297,9 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet< if (voteToHalt) { // local traverser sets to process final TraverserSet<Object> toProcessTraversers = new TraverserSet<>(); + // traversers that need to be sent back to the workers (no longer can be processed locally by the master traversal) final TraverserSet<Object> remoteActiveTraversers = new TraverserSet<>(); + // halted traversers that have completed their journey final TraverserSet<Object> haltedTraversers = memory.get(HALTED_TRAVERSERS); // get all barrier traversers final Set<String> completedBarriers = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e6f2caa8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java index f833b6f..5798af0 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java @@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.process.computer.traversal; import org.apache.tinkerpop.gremlin.process.computer.Memory; import org.apache.tinkerpop.gremlin.process.computer.MessageScope; import org.apache.tinkerpop.gremlin.process.computer.Messenger; -import org.apache.tinkerpop.gremlin.process.computer.util.SingleMessenger; import org.apache.tinkerpop.gremlin.process.traversal.Step; import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; @@ -36,7 +35,6 @@ import org.apache.tinkerpop.gremlin.structure.Element; import org.apache.tinkerpop.gremlin.structure.Property; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.util.Attachable; -import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceElement; import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; @@ -65,19 +63,18 @@ final class WorkerExecutor { final TraverserSet<Object> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS); final TraverserSet<Object> activeTraversers = new TraverserSet<>(); final TraverserSet<Object> toProcessTraversers = new TraverserSet<>(); - final boolean isTesting = Boolean.valueOf(System.getProperty("is.testing", "false")); //////////////////////////////// // GENERATE LOCAL TRAVERSERS // /////////////////////////////// - // some memory systems are interacted by multiple threads and thus, concurrent modification can happen at iterator.remove() + // these are traversers that are going from OLTP (master) to OLAP (workers) + // these traversers were broadcasted from the master traversal to the workers for attachment + final TraverserSet<Object> maybeActiveTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS); + // some memory systems are interacted with by multiple threads and thus, concurrent modification can happen at iterator.remove(). // its better to reduce the memory footprint and shorten the active traverser list so synchronization is worth it. - // most distributed OLAP systems have the memory partitioned and thus, this synchronization does nothing - synchronized (memory) { - // these are traversers that are going from OLTP to OLAP - // these traversers were broadcasted from the master traversal to the workers for attachment - final TraverserSet<Object> maybeActiveTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS); + // most distributed OLAP systems have the memory partitioned and thus, this synchronization does nothing. + synchronized (maybeActiveTraversers) { final Iterator<Traverser.Admin<Object>> iterator = maybeActiveTraversers.iterator(); while (iterator.hasNext()) { final Traverser.Admin<Object> traverser = iterator.next(); @@ -97,20 +94,19 @@ final class WorkerExecutor { traverser.setSideEffects(traversalSideEffects); toProcessTraversers.add(traverser); }); + assert previousActiveTraversers.isEmpty(); + // remove the property to save space vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS).remove(); }); // these are traversers that have been messaged to the vertex from another vertex final Iterator<TraverserSet<Object>> messages = messenger.receiveMessages(); while (messages.hasNext()) { IteratorUtils.removeOnNext(messages.next().iterator()).forEachRemaining(traverser -> { - // this is internal testing to ensure that messaged elements are always ReferenceXXX and not DetachedXXX (related to HaltedTraverserFactoryStrategy) - if (isTesting && !(messenger instanceof SingleMessenger) && traverser.get() instanceof Element) - assert traverser.get() instanceof ReferenceElement; if (traverser.isHalted()) { if (returnHaltedTraversers) memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(MasterExecutor.detach(traverser, haltedTraverserFactory))); else - haltedTraversers.add(traverser); + haltedTraversers.add(traverser.detach()); } else { // traverser is not halted and thus, should be processed locally traverser.attach(Attachable.Method.get(vertex)); @@ -132,6 +128,7 @@ final class WorkerExecutor { final Traverser.Admin<Object> traverser = traversers.next(); traversers.remove(); final Step<Object, Object> currentStep = traversalMatrix.getStepById(traverser.getStepId()); + // try and fill up the current step as much as possible with traversers to get a bulking optimization if (!currentStep.getId().equals(previousStep.getId()) && !(previousStep instanceof EmptyStep)) WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers, haltedTraverserFactory); currentStep.addStart(traverser); @@ -150,12 +147,11 @@ final class WorkerExecutor { if (traverser.get() instanceof Element || traverser.get() instanceof Property) { // GRAPH OBJECT // if the element is remote, then message, else store it locally for re-processing final Vertex hostingVertex = WorkerExecutor.getHostingVertex(traverser.get()); - if (!vertex.equals(hostingVertex)) { // necessary for path access - voteToHalt.set(false); + if (!vertex.equals(hostingVertex)) { // if its host is not the current vertex, then send the traverser to the hosting vertex + voteToHalt.set(false); // if message is passed, then don't vote to halt messenger.sendMessage(MessageScope.Global.of(hostingVertex), new TraverserSet<>(traverser.detach())); } else { - if (traverser.get() instanceof Attachable) // necessary for path access to local object - traverser.attach(Attachable.Method.get(vertex)); + traverser.attach(Attachable.Method.get(vertex)); toProcessTraversers.add(traverser); } } else // STANDARD OBJECT @@ -195,7 +191,7 @@ final class WorkerExecutor { else haltedTraversers.add(traverser.detach()); } else - localBarrierTraversers.add(traverser); + localBarrierTraversers.add(traverser.detach()); }); } memory.add(TraversalVertexProgram.MUTATED_MEMORY_KEYS, new HashSet<>(Collections.singleton(step.getId()))); http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e6f2caa8/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java index d643cd4..d0f50d0 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java @@ -109,7 +109,7 @@ import java.util.stream.Stream; test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyMatchTest$Traversals", method = "g_V_matchXa_0sungBy_b__a_0sungBy_c__b_writtenBy_d__c_writtenBy_e__d_hasXname_George_HarisonX__e_hasXname_Bob_MarleyXX", reason = "Hadoop-Gremlin is OLAP-oriented and for OLTP operations, linear-scan joins are required. This particular tests takes many minutes to execute.", - computers = {"ALL"}) + computers = {"org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer"}) // this is a nasty long test, just do it once in Java MatchTest @Graph.OptOut( test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyMatchTest$Traversals", method = "g_V_matchXa_0sungBy_b__a_0writtenBy_c__b_writtenBy_d__c_sungBy_d__d_hasXname_GarciaXX", http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e6f2caa8/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategyTest.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategyTest.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategyTest.java index af0b3b7..43bc94e 100644 --- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategyTest.java +++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategyTest.java @@ -23,10 +23,12 @@ import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decorati import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdge; +import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedPath; import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedProperty; import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex; import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty; import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceEdge; +import org.apache.tinkerpop.gremlin.structure.util.reference.ReferencePath; import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceProperty; import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertex; import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertexProperty; @@ -64,6 +66,7 @@ public class HaltedTraverserFactoryStrategyTest { g.V().out().outE().properties("weight").forEachRemaining(property -> assertEquals(DetachedProperty.class, property.getClass())); g.V().out().outE().values("weight").forEachRemaining(value -> assertEquals(Double.class, value.getClass())); g.V().out().out().forEachRemaining(vertex -> assertEquals(DetachedVertex.class, vertex.getClass())); + g.V().out().out().path().forEachRemaining(path -> assertEquals(DetachedPath.class, path.getClass())); } @Test @@ -77,7 +80,8 @@ public class HaltedTraverserFactoryStrategyTest { g.V().out().outE().properties("weight").forEachRemaining(property -> assertEquals(ReferenceProperty.class, property.getClass())); g.V().out().outE().values("weight").forEachRemaining(value -> assertEquals(Double.class, value.getClass())); g.V().out().out().forEachRemaining(vertex -> assertEquals(ReferenceVertex.class, vertex.getClass())); - // + g.V().out().out().path().forEachRemaining(path -> assertEquals(ReferencePath.class, path.getClass())); + // the default should be reference elements g = graph.traversal().withComputer(); g.V().out().forEachRemaining(vertex -> assertEquals(ReferenceVertex.class, vertex.getClass())); g.V().out().properties("name").forEachRemaining(vertexProperty -> assertEquals(ReferenceVertexProperty.class, vertexProperty.getClass())); @@ -86,6 +90,7 @@ public class HaltedTraverserFactoryStrategyTest { g.V().out().outE().properties("weight").forEachRemaining(property -> assertEquals(ReferenceProperty.class, property.getClass())); g.V().out().outE().values("weight").forEachRemaining(value -> assertEquals(Double.class, value.getClass())); g.V().out().out().forEachRemaining(vertex -> assertEquals(ReferenceVertex.class, vertex.getClass())); + g.V().out().out().path().forEachRemaining(path -> assertEquals(ReferencePath.class, path.getClass())); } }
