lots of documentation on TraversalVertexProgram and got HaltedTraveserFactoryStrategy tested and optimized.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/3978e7bd Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/3978e7bd Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/3978e7bd Branch: refs/heads/TINKERPOP-1310 Commit: 3978e7bda5f4896ea1c2815c889e206afcbaccaa Parents: 19f16f1 Author: Marko A. Rodriguez <[email protected]> Authored: Wed May 25 12:25:31 2016 -0600 Committer: Marko A. Rodriguez <[email protected]> Committed: Wed May 25 12:25:31 2016 -0600 ---------------------------------------------------------------------- .../computer/traversal/MasterExecutor.java | 6 +- .../computer/traversal/WorkerExecutor.java | 56 +++++++----- .../HaltedTraverserFactoryStrategy.java | 13 ++- .../HaltedTraverserFactoryStrategyTest.java | 91 ++++++++++++++++++++ 4 files changed, 140 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3978e7bd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java index b5ec12b..b994f1e 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java @@ -58,6 +58,8 @@ final class MasterExecutor { } + // the halted traversers can either be reference or detached elements -- this is good for determining how much data around the element the user wants to get + // see HaltedTraverserFactoryStrategy for how this is all connected protected static <R> Traverser.Admin<R> detach(final Traverser.Admin<R> traverser, final Class haltedTraverserFactory) { if (haltedTraverserFactory.equals(DetachedFactory.class)) traverser.set(DetachedFactory.detach(traverser.get(), true)); @@ -68,11 +70,12 @@ final class MasterExecutor { return traverser; } + // handle traversers and data that were sent from the workers to the master traversal via memory protected static void processMemory(final TraversalMatrix<?, ?> traversalMatrix, final Memory memory, final TraverserSet<Object> traverserSet, final Set<String> completedBarriers) { if (memory.exists(TraversalVertexProgram.MUTATED_MEMORY_KEYS)) { for (final String key : memory.<Set<String>>get(TraversalVertexProgram.MUTATED_MEMORY_KEYS)) { final Step<Object, Object> step = traversalMatrix.getStepById(key); - if (null == step) continue; + if (null == step) continue; // why? how can this happen? assert step instanceof Barrier; completedBarriers.add(step.getId()); if (!(step instanceof LocalBarrier)) { // local barriers don't do any processing on the master traversal (they just lock on the workers) @@ -81,6 +84,7 @@ final class MasterExecutor { while (step.hasNext()) { traverserSet.add(step.next()); } + // if it was a reducing barrier step, reset the barrier to its seed value if (step instanceof ReducingBarrierStep) memory.set(step.getId(), ((ReducingBarrierStep) step).getSeedSupplier().get()); } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3978e7bd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java index 5bc3da9..f833b6f 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java @@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.process.computer.traversal; import org.apache.tinkerpop.gremlin.process.computer.Memory; import org.apache.tinkerpop.gremlin.process.computer.MessageScope; import org.apache.tinkerpop.gremlin.process.computer.Messenger; +import org.apache.tinkerpop.gremlin.process.computer.util.SingleMessenger; import org.apache.tinkerpop.gremlin.process.traversal.Step; import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; @@ -35,6 +36,7 @@ import org.apache.tinkerpop.gremlin.structure.Element; import org.apache.tinkerpop.gremlin.structure.Property; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.util.Attachable; +import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceElement; import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; @@ -58,30 +60,37 @@ final class WorkerExecutor { final Memory memory, final boolean returnHaltedTraversers, final Class haltedTraverserFactory) { - final TraversalSideEffects traversalSideEffects = traversalMatrix.getTraversal().getSideEffects(); final AtomicBoolean voteToHalt = new AtomicBoolean(true); final TraverserSet<Object> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS); final TraverserSet<Object> activeTraversers = new TraverserSet<>(); final TraverserSet<Object> toProcessTraversers = new TraverserSet<>(); + final boolean isTesting = Boolean.valueOf(System.getProperty("is.testing", "false")); //////////////////////////////// // GENERATE LOCAL TRAVERSERS // /////////////////////////////// - // these are traversers that are going from OLTP to OLAP - final TraverserSet<Object> maybeActiveTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS); - final Iterator<Traverser.Admin<Object>> iterator = maybeActiveTraversers.iterator(); - while (iterator.hasNext()) { - final Traverser.Admin<Object> traverser = iterator.next(); - if (vertex.equals(WorkerExecutor.getHostingVertex(traverser.get()))) { - // iterator.remove(); ConcurrentModificationException - traverser.attach(Attachable.Method.get(vertex)); - traverser.setSideEffects(traversalSideEffects); - toProcessTraversers.add(traverser); + // some memory systems are interacted by multiple threads and thus, concurrent modification can happen at iterator.remove() + // its better to reduce the memory footprint and shorten the active traverser list so synchronization is worth it. + // most distributed OLAP systems have the memory partitioned and thus, this synchronization does nothing + synchronized (memory) { + // these are traversers that are going from OLTP to OLAP + // these traversers were broadcasted from the master traversal to the workers for attachment + final TraverserSet<Object> maybeActiveTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS); + final Iterator<Traverser.Admin<Object>> iterator = maybeActiveTraversers.iterator(); + while (iterator.hasNext()) { + final Traverser.Admin<Object> traverser = iterator.next(); + if (vertex.equals(WorkerExecutor.getHostingVertex(traverser.get()))) { + iterator.remove(); + traverser.attach(Attachable.Method.get(vertex)); + traverser.setSideEffects(traversalSideEffects); + toProcessTraversers.add(traverser); + } } } // these are traversers that exist from from a local barrier + // these traversers will simply saved at the local vertex while the master traversal synchronized the barrier vertex.<TraverserSet<Object>>property(TraversalVertexProgram.ACTIVE_TRAVERSERS).ifPresent(previousActiveTraversers -> { IteratorUtils.removeOnNext(previousActiveTraversers.iterator()).forEachRemaining(traverser -> { traverser.attach(Attachable.Method.get(vertex)); @@ -90,24 +99,25 @@ final class WorkerExecutor { }); vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS).remove(); }); - // these are traversers that have been messaged to the vertex + // these are traversers that have been messaged to the vertex from another vertex final Iterator<TraverserSet<Object>> messages = messenger.receiveMessages(); while (messages.hasNext()) { - final Iterator<Traverser.Admin<Object>> traversers = messages.next().iterator(); - while (traversers.hasNext()) { - final Traverser.Admin<Object> traverser = traversers.next(); - traversers.remove(); + IteratorUtils.removeOnNext(messages.next().iterator()).forEachRemaining(traverser -> { + // this is internal testing to ensure that messaged elements are always ReferenceXXX and not DetachedXXX (related to HaltedTraverserFactoryStrategy) + if (isTesting && !(messenger instanceof SingleMessenger) && traverser.get() instanceof Element) + assert traverser.get() instanceof ReferenceElement; if (traverser.isHalted()) { if (returnHaltedTraversers) memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(MasterExecutor.detach(traverser, haltedTraverserFactory))); else haltedTraversers.add(traverser); } else { + // traverser is not halted and thus, should be processed locally traverser.attach(Attachable.Method.get(vertex)); traverser.setSideEffects(traversalSideEffects); toProcessTraversers.add(traverser); } - } + }); } /////////////////////////////// @@ -116,7 +126,6 @@ final class WorkerExecutor { // while there are still local traversers, process them until they leave the vertex or halt (i.e. isHalted()). while (!toProcessTraversers.isEmpty()) { - // process local traversers and if alive, repeat, else halt. Step<Object, Object> previousStep = EmptyStep.instance(); Iterator<Traverser.Admin<Object>> traversers = toProcessTraversers.iterator(); while (traversers.hasNext()) { @@ -129,6 +138,7 @@ final class WorkerExecutor { previousStep = currentStep; } WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers, haltedTraverserFactory); + // all processed traversers should be either halted or active assert toProcessTraversers.isEmpty(); // process all the local objects and send messages or store locally again if (!activeTraversers.isEmpty()) { @@ -136,6 +146,7 @@ final class WorkerExecutor { while (traversers.hasNext()) { final Traverser.Admin<Object> traverser = traversers.next(); traversers.remove(); + // decide whether to message the traverser or to process it locally if (traverser.get() instanceof Element || traverser.get() instanceof Property) { // GRAPH OBJECT // if the element is remote, then message, else store it locally for re-processing final Vertex hostingVertex = WorkerExecutor.getHostingVertex(traverser.get()); @@ -167,9 +178,10 @@ final class WorkerExecutor { if (step instanceof Bypassing) ((Bypassing) step).setBypass(true); if (step instanceof LocalBarrier) { + // local barrier traversers are stored on the vertex until the master traversal synchronizes the system final LocalBarrier<Object> barrier = (LocalBarrier<Object>) step; - final TraverserSet<Object> traverserSet = vertex.<TraverserSet<Object>>property(TraversalVertexProgram.ACTIVE_TRAVERSERS).orElse(new TraverserSet<>()); - vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS, traverserSet); + final TraverserSet<Object> localBarrierTraversers = vertex.<TraverserSet<Object>>property(TraversalVertexProgram.ACTIVE_TRAVERSERS).orElse(new TraverserSet<>()); + vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS, localBarrierTraversers); while (barrier.hasNextBarrier()) { final TraverserSet<Object> barrierSet = barrier.nextBarrier(); IteratorUtils.removeOnNext(barrierSet.iterator()).forEachRemaining(traverser -> { @@ -183,7 +195,7 @@ final class WorkerExecutor { else haltedTraversers.add(traverser.detach()); } else - traverserSet.add(traverser.detach()); + localBarrierTraversers.add(traverser); }); } memory.add(TraversalVertexProgram.MUTATED_MEMORY_KEYS, new HashSet<>(Collections.singleton(step.getId()))); @@ -198,7 +210,7 @@ final class WorkerExecutor { step.forEachRemaining(traverser -> { if (traverser.isHalted() && // if its a ReferenceFactory (one less iteration) - ((returnHaltedTraversers || haltedTraverserFactory == ReferenceFactory.class) && + ((returnHaltedTraversers || ReferenceFactory.class == haltedTraverserFactory) && (!(traverser.get() instanceof Element) && !(traverser.get() instanceof Property)) || getHostingVertex(traverser.get()).equals(vertex))) { if (returnHaltedTraversers) http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3978e7bd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategy.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategy.java index b046986..c2f3855 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategy.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategy.java @@ -22,12 +22,14 @@ package org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decorat import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy; import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory; import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory; import java.util.Collections; +import java.util.List; import java.util.Set; /** @@ -42,11 +44,16 @@ public final class HaltedTraverserFactoryStrategy extends AbstractTraversalStrat } public void apply(final Traversal.Admin<?, ?> traversal) { - TraversalHelper.getStepsOfAssignableClass(TraversalVertexProgramStep.class, traversal) - .forEach(step -> step.setHaltedTraverserFactory(this.haltedTraverserFactory)); + // only the root traversal should be processed + if (traversal.getParent() instanceof EmptyStep) { + final List<TraversalVertexProgramStep> steps = TraversalHelper.getStepsOfAssignableClass(TraversalVertexProgramStep.class, traversal); + // only the last step (the one returning data) needs to have a non-reference traverser factory + if (!steps.isEmpty()) + steps.get(steps.size() - 1).setHaltedTraverserFactory(this.haltedTraverserFactory); + } } - public static HaltedTraverserFactoryStrategy detach() { + public static HaltedTraverserFactoryStrategy detached() { return new HaltedTraverserFactoryStrategy(DetachedFactory.class); } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3978e7bd/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategyTest.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategyTest.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategyTest.java new file mode 100644 index 0000000..af0b3b7 --- /dev/null +++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategyTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.tinkergraph.process.computer.traversal.strategy.decoration; + +import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.HaltedTraverserFactoryStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdge; +import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedProperty; +import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex; +import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty; +import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceEdge; +import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceProperty; +import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertex; +import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertexProperty; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class HaltedTraverserFactoryStrategyTest { + + @Before + public void setup() { + // necessary as ComputerResult step for testing purposes attaches Attachables + System.setProperty("is.testing", "false"); + } + + @After + public void shutdown() { + System.setProperty("is.testing", "true"); + } + + @Test + public void shouldReturnDetachedElements() { + Graph graph = TinkerFactory.createModern(); + GraphTraversalSource g = graph.traversal().withComputer().withStrategies(HaltedTraverserFactoryStrategy.detached()); + g.V().out().forEachRemaining(vertex -> assertEquals(DetachedVertex.class, vertex.getClass())); + g.V().out().properties("name").forEachRemaining(vertexProperty -> assertEquals(DetachedVertexProperty.class, vertexProperty.getClass())); + g.V().out().values("name").forEachRemaining(value -> assertEquals(String.class, value.getClass())); + g.V().out().outE().forEachRemaining(edge -> assertEquals(DetachedEdge.class, edge.getClass())); + g.V().out().outE().properties("weight").forEachRemaining(property -> assertEquals(DetachedProperty.class, property.getClass())); + g.V().out().outE().values("weight").forEachRemaining(value -> assertEquals(Double.class, value.getClass())); + g.V().out().out().forEachRemaining(vertex -> assertEquals(DetachedVertex.class, vertex.getClass())); + } + + @Test + public void shouldReturnReferenceElements() { + Graph graph = TinkerFactory.createModern(); + GraphTraversalSource g = graph.traversal().withComputer().withStrategies(HaltedTraverserFactoryStrategy.reference()); + g.V().out().forEachRemaining(vertex -> assertEquals(ReferenceVertex.class, vertex.getClass())); + g.V().out().properties("name").forEachRemaining(vertexProperty -> assertEquals(ReferenceVertexProperty.class, vertexProperty.getClass())); + g.V().out().values("name").forEachRemaining(value -> assertEquals(String.class, value.getClass())); + g.V().out().outE().forEachRemaining(edge -> assertEquals(ReferenceEdge.class, edge.getClass())); + g.V().out().outE().properties("weight").forEachRemaining(property -> assertEquals(ReferenceProperty.class, property.getClass())); + g.V().out().outE().values("weight").forEachRemaining(value -> assertEquals(Double.class, value.getClass())); + g.V().out().out().forEachRemaining(vertex -> assertEquals(ReferenceVertex.class, vertex.getClass())); + // + g = graph.traversal().withComputer(); + g.V().out().forEachRemaining(vertex -> assertEquals(ReferenceVertex.class, vertex.getClass())); + g.V().out().properties("name").forEachRemaining(vertexProperty -> assertEquals(ReferenceVertexProperty.class, vertexProperty.getClass())); + g.V().out().values("name").forEachRemaining(value -> assertEquals(String.class, value.getClass())); + g.V().out().outE().forEachRemaining(edge -> assertEquals(ReferenceEdge.class, edge.getClass())); + g.V().out().outE().properties("weight").forEachRemaining(property -> assertEquals(ReferenceProperty.class, property.getClass())); + g.V().out().outE().values("weight").forEachRemaining(value -> assertEquals(Double.class, value.getClass())); + g.V().out().out().forEachRemaining(vertex -> assertEquals(ReferenceVertex.class, vertex.getClass())); + } + +}
