added comments in SPVP
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/eae0184a Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/eae0184a Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/eae0184a Branch: refs/heads/TINKERPOP-1990 Commit: eae0184a636ed296a2b67bea12a64c93e683df5d Parents: 62d0dbc Author: Daniel Kuppitz <daniel_kupp...@hotmail.com> Authored: Mon Jul 30 09:40:43 2018 -0700 Committer: Daniel Kuppitz <daniel_kupp...@hotmail.com> Committed: Wed Aug 1 12:26:30 2018 -0700 ---------------------------------------------------------------------- .../search/path/ShortestPathVertexProgram.java | 32 +++++++++++++++++++- .../step/map/ShortestPathVertexProgramStep.java | 3 ++ 2 files changed, 34 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/eae0184a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/search/path/ShortestPathVertexProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/search/path/ShortestPathVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/search/path/ShortestPathVertexProgram.java index 549dff9..1949c53 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/search/path/ShortestPathVertexProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/search/path/ShortestPathVertexProgram.java @@ -152,6 +152,7 @@ public class ShortestPathVertexProgram implements VertexProgram<Triplet<Path, Ed } } + // restore halted traversers from the configuration and build an index for direct access this.haltedTraversers = TraversalVertexProgram.loadHaltedTraversers(configuration); this.haltedTraversersIndex = new IndexedTraverserSet<>(v -> v); for (final Traverser.Admin<Vertex> traverser : this.haltedTraversers) { @@ -254,10 +255,15 @@ public class ShortestPathVertexProgram implements VertexProgram<Triplet<Path, Ed if (memory.isInitialIteration()) { + // Use the first iteration to copy halted traversers from the halted traverser index to the respective + // vertices. This way the rest of the code can be simplified and always expect the HALTED_TRAVERSERS + // property to be available (if halted traversers exist for this vertex). copyHaltedTraversersFromMemory(vertex); + // ignore vertices that don't pass the start-vertex filter if (!isStartVertex(vertex)) return; + // start to track paths for all valid start-vertices final Map<Vertex, Pair<Number, Set<Path>>> paths = new HashMap<>(); final Path path; final Set<Path> pathSet = new HashSet<>(); @@ -267,12 +273,14 @@ public class ShortestPathVertexProgram implements VertexProgram<Triplet<Path, Ed vertex.property(VertexProperty.Cardinality.single, PATHS, paths); + // send messages to valid adjacent vertices processEdges(vertex, path, 0, messenger); voteToHalt = false; } else { + // load existing paths to this vertex and extend them based on messages received from adjacent vertices final Map<Vertex, Pair<Number, Set<Path>>> paths = vertex.<Map<Vertex, Pair<Number, Set<Path>>>>property(PATHS).orElseGet(HashMap::new); final Iterator<Triplet<Path, Edge, Number>> iterator = messenger.receiveMessages(); @@ -286,6 +294,7 @@ public class ShortestPathVertexProgram implements VertexProgram<Triplet<Path, Ed Path newPath = null; + // already know a path coming from this source vertex? if (paths.containsKey(sourceVertex)) { final Number currentShortestDistance = paths.get(sourceVertex).getValue0(); @@ -294,19 +303,26 @@ public class ShortestPathVertexProgram implements VertexProgram<Triplet<Path, Ed if (cmp <= 0) { newPath = extendPath(sourcePath, triplet.getValue1(), vertex); if (cmp < 0) { + // if the path length is smaller than the current shortest path's length, replace the + // current set of shortest paths final Set<Path> pathSet = new HashSet<>(); pathSet.add(newPath); paths.put(sourceVertex, Pair.with(distance, pathSet)); } else { + // if the path length is equal to the current shortest path's length, add the new path + // to the set of shortest paths paths.get(sourceVertex).getValue1().add(newPath); } } } else if (!exceedsMaxDistance(distance)) { + // store the new path as the shortest path from the source vertex to the current vertex final Set<Path> pathSet = new HashSet<>(); pathSet.add(newPath = extendPath(sourcePath, triplet.getValue1(), vertex)); paths.put(sourceVertex, Pair.with(distance, pathSet)); } + // if a new path was found, send messages to adjacent vertices, otherwise do nothing as there's no + // chance to find any new paths going forward if (newPath != null) { vertex.property(VertexProperty.Cardinality.single, PATHS, paths); processEdges(vertex, newPath, distance, messenger); @@ -315,6 +331,7 @@ public class ShortestPathVertexProgram implements VertexProgram<Triplet<Path, Ed } } + // VOTE_TO_HALT will be set to true if an iteration hasn't found any new paths memory.add(VOTE_TO_HALT, voteToHalt); } @@ -327,12 +344,15 @@ public class ShortestPathVertexProgram implements VertexProgram<Triplet<Path, Ed if (voteToHalt) { final int state = memory.get(STATE); if (state == COLLECT_PATHS) { + // After paths were collected, + // a) the VP is done in standalone mode (paths will be in memory) or + // b) the halted traversers will be updated in order to have the paths available in the traversal if (this.standalone) return true; memory.set(STATE, UPDATE_HALTED_TRAVERSERS); return false; } if (state == UPDATE_HALTED_TRAVERSERS) return true; - else memory.set(STATE, COLLECT_PATHS); + else memory.set(STATE, COLLECT_PATHS); // collect paths if no new paths were found return false; } else { memory.set(VOTE_TO_HALT, true); @@ -399,11 +419,13 @@ public class ShortestPathVertexProgram implements VertexProgram<Triplet<Path, Ed } private boolean isStartVertex(final Vertex vertex) { + // use the sourceVertexFilterTraversal if the VP is running in standalone mode (not part of a traversal) if (this.standalone) { final Traversal.Admin<Vertex, ?> filterTraversal = this.sourceVertexFilterTraversal.getPure(); filterTraversal.addStart(filterTraversal.getTraverserGenerator().generate(vertex, filterTraversal.getStartStep(), 1)); return filterTraversal.hasNext(); } + // ...otherwise use halted traversers to determine whether this is a start vertex return vertex.property(TraversalVertexProgram.HALTED_TRAVERSERS).isPresent(); } @@ -429,6 +451,7 @@ public class ShortestPathVertexProgram implements VertexProgram<Triplet<Path, Ed if (otherV.equals(vertex)) otherV = edge.outVertex(); + // only send message if the adjacent vertex is not yet part of the current path if (!currentPath.objects().contains(otherV)) { messenger.sendMessage(MessageScope.Global.of(otherV), Triplet.with(currentPath, this.includeEdges ? edge : null, @@ -441,6 +464,8 @@ public class ShortestPathVertexProgram implements VertexProgram<Triplet<Path, Ed if (isStartVertex(vertex)) { final List<Path> paths = memory.get(SHORTEST_PATHS); if (vertex.property(TraversalVertexProgram.HALTED_TRAVERSERS).isPresent()) { + // replace the current set of halted traversers with new new traversers that hold the shortest paths + // found for this vertex final TraverserSet<Vertex> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS); final TraverserSet<Path> newHaltedTraversers = new TraverserSet<>(); for (final Traverser.Admin<Vertex> traverser : haltedTraversers) { @@ -471,6 +496,11 @@ public class ShortestPathVertexProgram implements VertexProgram<Triplet<Path, Ed && NumberHelper.compare(distance, this.maxDistance) > 0; } + /** + * Move any valid path into the VP's memory. + * @param vertex The current vertex. + * @param memory The VertexProgram's memory. + */ private void collectShortestPaths(final Vertex vertex, final Memory memory) { final VertexProperty<Map<Vertex, Pair<Number, Set<Path>>>> pathProperty = vertex.property(PATHS); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/eae0184a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ShortestPathVertexProgramStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ShortestPathVertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ShortestPathVertexProgramStep.java index 5a0f338..e9a09e2 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ShortestPathVertexProgramStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ShortestPathVertexProgramStep.java @@ -136,6 +136,9 @@ public final class ShortestPathVertexProgramStep extends VertexProgramStep imple ProgramVertexProgramStep.ROOT_TRAVERSAL, rootTraversalValue, ProgramVertexProgramStep.STEP_ID, this.id); + // There are two locations in which halted traversers can be stored: in memory or as vertex properties. In the + // former case they need to be copied to this VertexProgram's configuration as the VP won't have access to the + // previous VP's memory. if (memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS)) { final TraverserSet<?> haltedTraversers = memory.get(TraversalVertexProgram.HALTED_TRAVERSERS); if (!haltedTraversers.isEmpty()) {