TINKERPOP-1870: Extends TraverserSet to have Vertex index for remote traversers That replaces linear search in reversal traversal interator with hashtable lookup.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/f1ac225f Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/f1ac225f Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/f1ac225f Branch: refs/heads/TINKERPOP-1447 Commit: f1ac225f7bb99c88bb81a058449ad7c224210207 Parents: 231bbb6 Author: artemaliev <artem.aliev@gmail,com> Authored: Mon Jan 15 18:23:49 2018 +0300 Committer: artemaliev <artem.aliev@gmail,com> Committed: Fri Jan 19 14:09:53 2018 +0300 ---------------------------------------------------------------------- gremlin-core/pom.xml | 10 ++- .../traversal/TraversalVertexProgram.java | 9 +-- .../computer/traversal/WorkerExecutor.java | 14 ++-- .../traverser/util/VertexTraverserSet.java | 71 ++++++++++++++++++++ .../gremlin/structure/io/gryo/GryoVersion.java | 6 +- .../process/traversal/step/map/ProgramTest.java | 3 +- 6 files changed, 96 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f1ac225f/gremlin-core/pom.xml ---------------------------------------------------------------------- diff --git a/gremlin-core/pom.xml b/gremlin-core/pom.xml index ad48cd8..2aaf0e0 100644 --- a/gremlin-core/pom.xml +++ b/gremlin-core/pom.xml @@ -35,6 +35,10 @@ limitations under the License. <artifactId>commons-configuration</artifactId> </dependency> <dependency> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + </dependency> + <dependency> <groupId>org.yaml</groupId> <artifactId>snakeyaml</artifactId> <version>${snakeyaml.version}</version> @@ -95,12 +99,6 @@ limitations under the License. <artifactId>hamcrest-all</artifactId> <scope>test</scope> </dependency> - <!-- needed to test GraphFactory and xml based config via optional dependency in apache configurations --> - <dependency> - <groupId>commons-collections</groupId> - <artifactId>commons-collections</artifactId> - <scope>test</scope> - </dependency> <dependency> <groupId>com.google.testing.compile</groupId> <artifactId>compile-testing</artifactId> http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f1ac225f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java index 6f36306..d4c605b 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java @@ -53,6 +53,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; import org.apache.tinkerpop.gremlin.process.traversal.step.util.ProfileStep; import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.HaltedTraverserStrategy; import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ComputerVerificationStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.VertexTraverserSet; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversal; import org.apache.tinkerpop.gremlin.process.traversal.util.MutableMetrics; @@ -229,13 +230,13 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet< toProcessTraversers.add(traverser); }); assert this.haltedTraversers.isEmpty(); - final TraverserSet<Object> remoteActiveTraversers = new TraverserSet<>(); + final VertexTraverserSet<Object> remoteActiveTraversers = new VertexTraverserSet<>(); MasterExecutor.processTraversers(this.traversal, this.traversalMatrix, toProcessTraversers, remoteActiveTraversers, this.haltedTraversers, this.haltedTraverserStrategy); memory.set(HALTED_TRAVERSERS, this.haltedTraversers); memory.set(ACTIVE_TRAVERSERS, remoteActiveTraversers); } else { memory.set(HALTED_TRAVERSERS, new TraverserSet<>()); - memory.set(ACTIVE_TRAVERSERS, new TraverserSet<>()); + memory.set(ACTIVE_TRAVERSERS, new VertexTraverserSet<>()); } // local variable will no longer be used so null it for GC this.haltedTraversers = null; @@ -315,12 +316,12 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet< MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, ProgramPhase.TERMINATE); final boolean voteToHalt = memory.<Boolean>get(VOTE_TO_HALT); memory.set(VOTE_TO_HALT, true); - memory.set(ACTIVE_TRAVERSERS, new TraverserSet<>()); + memory.set(ACTIVE_TRAVERSERS, new VertexTraverserSet<>()); if (voteToHalt) { // local traverser sets to process final TraverserSet<Object> toProcessTraversers = new TraverserSet<>(); // traversers that need to be sent back to the workers (no longer can be processed locally by the master traversal) - final TraverserSet<Object> remoteActiveTraversers = new TraverserSet<>(); + final VertexTraverserSet<Object> remoteActiveTraversers = new VertexTraverserSet<>(); // halted traversers that have completed their journey final TraverserSet<Object> haltedTraversers = memory.get(HALTED_TRAVERSERS); // get all barrier traversers http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f1ac225f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java index e6e73d0..a875afe 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java @@ -31,6 +31,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier; import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.HaltedTraverserStrategy; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.VertexTraverserSet; import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix; import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Element; @@ -40,6 +41,7 @@ import org.apache.tinkerpop.gremlin.structure.util.Attachable; import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -73,17 +75,19 @@ final class WorkerExecutor { // MASTER ACTIVE // these are traversers that are going from OLTP (master) to OLAP (workers) // these traversers were broadcasted from the master traversal to the workers for attachment - final TraverserSet<Object> maybeActiveTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS); + final VertexTraverserSet<Object> maybeActiveTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS); // some memory systems are interacted with by multiple threads and thus, concurrent modification can happen at iterator.remove(). // its better to reduce the memory footprint and shorten the active traverser list so synchronization is worth it. // most distributed OLAP systems have the memory partitioned and thus, this synchronization does nothing. synchronized (maybeActiveTraversers) { if (!maybeActiveTraversers.isEmpty()) { - final Iterator<Traverser.Admin<Object>> iterator = maybeActiveTraversers.iterator(); - while (iterator.hasNext()) { - final Traverser.Admin<Object> traverser = iterator.next(); - if (vertex.equals(WorkerExecutor.getHostingVertex(traverser.get()))) { + final Collection<Traverser.Admin<Object>> traversers = maybeActiveTraversers.get(vertex); + if (traversers != null) { + final Iterator<Traverser.Admin<Object>> iterator = traversers.iterator(); + while (iterator.hasNext()) { + final Traverser.Admin<Object> traverser = iterator.next(); iterator.remove(); + maybeActiveTraversers.remove(traverser); traverser.attach(Attachable.Method.get(vertex)); traverser.setSideEffects(traversalSideEffects); toProcessTraversers.add(traverser); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f1ac225f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/util/VertexTraverserSet.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/util/VertexTraverserSet.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/util/VertexTraverserSet.java new file mode 100644 index 0000000..05ba3a0 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/util/VertexTraverserSet.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.process.traversal.traverser.util; + +import org.apache.commons.collections.map.MultiValueMap; +import org.apache.tinkerpop.gremlin.process.traversal.Traverser; +import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.Property; +import org.apache.tinkerpop.gremlin.structure.Vertex; + +import java.util.Collection; + +public class VertexTraverserSet<S> extends TraverserSet<S> { + // that should be MultiValueMap<Vertex, Traverser.Admin<S>> but 3.2.2 apache collection do not use generics + private final MultiValueMap vertexIndex = new MultiValueMap(); + + @Override + public void clear() { + vertexIndex.clear(); + super.clear(); + } + + @Override + public boolean add(final Traverser.Admin<S> traverser) { + if (super.add(traverser)) { + vertexIndex.put(getHostingVertex(traverser.get()), traverser); + return true; + } else { + return false; + } + } + + public Collection<Traverser.Admin<S>> get(Vertex v) { + return vertexIndex.getCollection(v); + } + + @Override + public boolean offer(final Traverser.Admin<S> traverser) { + return this.add(traverser); + } + + private static Vertex getHostingVertex(final Object object) { + Object obj = object; + while (true) { + if (obj instanceof Vertex) + return (Vertex) obj; + else if (obj instanceof Edge) + return ((Edge) obj).outVertex(); + else if (obj instanceof Property) + obj = ((Property) obj).element(); + else + throw new IllegalStateException("The host of the object is unknown: " + obj.toString() + ':' + obj.getClass().getCanonicalName()); + } + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f1ac225f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoVersion.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoVersion.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoVersion.java index f7e5a1d..6d76ce8 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoVersion.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoVersion.java @@ -52,6 +52,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.traverser.LP_O_OB_S_SE_SL_ import org.apache.tinkerpop.gremlin.process.traversal.traverser.O_OB_S_SE_SL_Traverser; import org.apache.tinkerpop.gremlin.process.traversal.traverser.O_Traverser; import org.apache.tinkerpop.gremlin.process.traversal.traverser.ProjectedTraverser; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.VertexTraverserSet; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversalMetrics; import org.apache.tinkerpop.gremlin.process.traversal.util.ImmutableMetrics; @@ -204,7 +205,7 @@ public enum GryoVersion { add(GryoTypeReg.of(HashMap.class, 11)); add(GryoTypeReg.of(HashMap.Entry.class, 16)); add(GryoTypeReg.of(Types.HASH_MAP_NODE, 92)); - add(GryoTypeReg.of(Types.HASH_MAP_TREE_NODE, 170)); // ***LAST ID*** + add(GryoTypeReg.of(Types.HASH_MAP_TREE_NODE, 170)); add(GryoTypeReg.of(KryoSerializable.class, 36)); add(GryoTypeReg.of(LinkedHashMap.class, 47)); add(GryoTypeReg.of(LinkedHashSet.class, 71)); @@ -268,6 +269,7 @@ public enum GryoVersion { add(GryoTypeReg.of(MultiComparator.class, 165)); add(GryoTypeReg.of(TraverserSet.class, 58)); + add(GryoTypeReg.of(Tree.class, 61)); add(GryoTypeReg.of(HashSet.class, 62)); add(GryoTypeReg.of(BulkSet.class, 64)); @@ -305,6 +307,8 @@ public enum GryoVersion { add(GryoTypeReg.of(RangeGlobalStep.RangeBiOperator.class, 114)); add(GryoTypeReg.of(OrderGlobalStep.OrderBiOperator.class, 118, new JavaSerializer())); add(GryoTypeReg.of(ProfileStep.ProfileBiOperator.class, 119)); + // skip 171, 172 to sync with tp33 + add(GryoTypeReg.of(VertexTraverserSet.class, 173)); // ***LAST ID*** }}; } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f1ac225f/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java ---------------------------------------------------------------------- diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java index 5f548da..c50109e 100644 --- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java +++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java @@ -43,6 +43,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects; import org.apache.tinkerpop.gremlin.process.traversal.TraverserGenerator; import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet; import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.VertexTraverserSet; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; import org.apache.tinkerpop.gremlin.process.traversal.util.EmptyTraversal; import org.apache.tinkerpop.gremlin.process.traversal.util.PureTraversal; @@ -193,7 +194,7 @@ public abstract class ProgramTest extends AbstractGremlinProcessTest { assertEquals(2, map.size()); assertTrue(map.values().contains(3l)); assertTrue(map.values().contains(1l)); - final TraverserSet<Object> activeTraversers = new TraverserSet<>(); + final VertexTraverserSet<Object> activeTraversers = new VertexTraverserSet<>(); map.keySet().forEach(vertex -> activeTraversers.add(this.haltedTraversers.peek().split(vertex, EmptyStep.instance()))); this.haltedTraversers.clear(); this.checkSideEffects();
