fixed HALTED_TRAVERSER bug where traversers were not at their 'resting 
location'. This only shows up in multi-OLAP job chains. HALTED_TRAVERESERS 
(master travesal) are now propagated via Configuration and NOT via sideEffects. 
This is more elegant and there are now helper methods in TraversalVertexProgram 
to make getting master halted traversers easy. Updated the-travesal.asciidoc to 
reflect how to use ProgramStep with it. Had to change a method signature in 
VertexComputing to account for this... its slight (breaking), but no one is 
using this right now, I'm sure of it. And if they are, the chaneg is trivial, 
just add a new argument to a method signature and ignore it to make it be like 
how it is in 3.2.0. ImmutablePath.TailPath was not serializable and this caused 
issues for HALTED_TRAVERSERS as well. Everything else fixed up.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/4ee30289
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/4ee30289
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/4ee30289

Branch: refs/heads/TINKERPOP-1308
Commit: 4ee302899da37a0ad0ebab6bc4954ed39b69d6d9
Parents: c61095f
Author: Marko A. Rodriguez <okramma...@gmail.com>
Authored: Wed May 18 16:15:40 2016 -0600
Committer: Marko A. Rodriguez <okramma...@gmail.com>
Committed: Wed May 18 16:15:40 2016 -0600

----------------------------------------------------------------------
 docs/src/reference/the-traversal.asciidoc       | 23 +++++++--
 .../process/computer/GiraphGraphComputer.java   |  6 +++
 .../traversal/MemoryTraversalSideEffects.java   |  1 -
 .../traversal/TraversalVertexProgram.java       | 50 +++++++++++++++++---
 .../computer/traversal/TraverserExecutor.java   | 21 +++++---
 .../traversal/step/VertexComputing.java         |  8 +++-
 .../step/map/PageRankVertexProgramStep.java     |  7 +--
 .../step/map/PeerPressureVertexProgramStep.java | 11 ++++-
 .../step/map/ProgramVertexProgramStep.java      | 10 +++-
 .../step/map/TraversalVertexProgramStep.java    | 13 +++--
 .../traversal/step/map/VertexProgramStep.java   | 21 +++-----
 .../optimization/GraphFilterStrategy.java       |  3 +-
 .../traversal/step/util/ImmutablePath.java      |  3 +-
 .../step/map/GroovyPageRankTest.groovy          |  5 ++
 .../traversal/step/map/PageRankTest.java        | 22 +++++++++
 .../process/traversal/step/map/ProgramTest.java | 38 ++++++++-------
 .../process/computer/SparkGraphComputer.java    |  7 ++-
 .../optimization/SparkInterceptorStrategy.java  |  5 +-
 .../SparkSingleIterationStrategy.java           |  4 +-
 19 files changed, 185 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/docs/src/reference/the-traversal.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/reference/the-traversal.asciidoc 
b/docs/src/reference/the-traversal.asciidoc
index 71708e2..3394e90 100644
--- a/docs/src/reference/the-traversal.asciidoc
+++ b/docs/src/reference/the-traversal.asciidoc
@@ -1436,6 +1436,8 @@ are provided below.
 
 [source,java]
 ----
+private TraverserSet<Object> haltedTraversers;
+
 public void loadState(final Graph graph, final Configuration configuration) {
   VertexProgram.super.loadState(graph, configuration);
   this.traversal = PureTraversal.loadState(configuration, 
VertexProgramStep.ROOT_TRAVERSAL, graph);
@@ -1444,18 +1446,29 @@ public void loadState(final Graph graph, final 
Configuration configuration) {
   
this.memoryComputeKeys.addAll(MemoryTraversalSideEffects.getMemoryComputeKeys(this.traversal.get()));
   // if master-traversal traversers may be propagated, create a memory compute 
key
   
this.memoryComputeKeys.add(MemoryComputeKey.of(TraversalVertexProgram.HALTED_TRAVERSERS,
 Operator.addAll, false, false));
+  // returns an empty traverser set if there are no halted traversers
+  this.haltedTraversers = 
TraversalVertexProgram.getHaltedTraversers(configuration);
+}
+
+public void storeState(final Configuration configuration) {
+  VertexProgram.super.storeState(configuration);
+  // if halted traversers is null or empty, it does nothing
+  TraversalVertexProgram.storeHaltedTraversers(configuration, 
this.haltedTraversers);
 }
 
 public void setup(final Memory memory) {
-  if(sideEffects.exists(TraversalVertexProgram.HALTED_TRAVERSERS)) {
-    // haltedTraversers in setup() represent master-traversal traversers
-    // for example, from a traversal of the form 
g.V().groupCount().program(...)
-    TraverserSet<Object> haltedTraversers = 
sideEffects.get(TraversalVertexProgram.HALTED_TRAVERSERS);
-    // do as you please with this information
+  if(null != this.haltedTraversers) {
+    // do what you like with the halted master traversal traversers
   }
+  // once used, no need to keep that information around (master)
+  if(null != this.haltedTraversers)
+    this.haltedTraversers.clear()
 }
 
 public void execute(final Vertex vertex, final Messenger messenger, final 
Memory memory) {
+  // once used, no need to keep that information around (workers)
+  if(null != this.haltedTraversers)
+    this.haltedTraversers.clear()
   if(vertex.property(TraversalVertexProgram.HALTED_TRAVERSERS).isPresent()) {
     // haltedTraversers in execute() represent worker-traversal traversers
     // for example, from a traversal of the form g.V().out().program(...)

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git 
a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
 
b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 6bd2a04..012b9fc 100644
--- 
a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ 
b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -58,9 +58,12 @@ import 
org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
 import org.apache.tinkerpop.gremlin.structure.io.Storage;
 import org.apache.tinkerpop.gremlin.util.Gremlin;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.io.File;
 import java.io.NotSerializableException;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
@@ -74,6 +77,7 @@ public final class GiraphGraphComputer extends 
AbstractHadoopGraphComputer imple
     protected GiraphConfiguration giraphConfiguration = new 
GiraphConfiguration();
     private MapMemory memory = new MapMemory();
     private boolean useWorkerThreadsInConfiguration;
+    private Set<String> vertexProgramConfigurationKeys = new HashSet<>();
 
     public GiraphGraphComputer(final HadoopGraph hadoopGraph) {
         super(hadoopGraph);
@@ -112,6 +116,7 @@ public final class GiraphGraphComputer extends 
AbstractHadoopGraphComputer imple
         final BaseConfiguration apacheConfiguration = new BaseConfiguration();
         apacheConfiguration.setDelimiterParsingDisabled(true);
         vertexProgram.storeState(apacheConfiguration);
+        IteratorUtils.fill(apacheConfiguration.getKeys(), 
this.vertexProgramConfigurationKeys);
         ConfUtil.mergeApacheIntoHadoopConfiguration(apacheConfiguration, 
this.giraphConfiguration);
         this.vertexProgram.getMessageCombiner().ifPresent(combiner -> 
this.giraphConfiguration.setMessageCombinerClass(GiraphMessageCombiner.class));
         return this;
@@ -139,6 +144,7 @@ public final class GiraphGraphComputer extends 
AbstractHadoopGraphComputer imple
             // clear properties that should not be propagated in an OLAP chain
             
apacheConfiguration.clearProperty(Constants.GREMLIN_HADOOP_GRAPH_FILTER);
             
apacheConfiguration.clearProperty(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR);
+            
this.vertexProgramConfigurationKeys.forEach(apacheConfiguration::clearProperty);
 // clear out vertex program specific configurations
             return new 
DefaultComputerResult(InputOutputHelper.getOutputGraph(apacheConfiguration, 
this.resultGraph, this.persist), this.memory.asImmutable());
         }, exec);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java
index 1327a7f..23d33f1 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java
@@ -192,7 +192,6 @@ public final class MemoryTraversalSideEffects implements 
TraversalSideEffects {
                         traversal.getSideEffects();
         sideEffects.keys().
                 stream().
-                filter(key -> 
!key.equals(TraversalVertexProgram.HALTED_TRAVERSERS)).
                 forEach(key -> keys.add(MemoryComputeKey.of(key, 
sideEffects.getReducer(key), true, false)));
         return keys;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index 4df5189..211a5e5 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -33,6 +33,7 @@ import 
org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.Computer
 import 
org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
 import 
org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy;
 import 
org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
+import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
 import org.apache.tinkerpop.gremlin.process.traversal.Operator;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
@@ -66,6 +67,7 @@ import 
org.apache.tinkerpop.gremlin.util.iterator.EmptyIterator;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -102,6 +104,7 @@ public final class TraversalVertexProgram implements 
VertexProgram<TraverserSet<
     private PureTraversal<?, ?> traversal;
     private TraversalMatrix<?, ?> traversalMatrix;
     private final Set<MapReduce> mapReducers = new HashSet<>();
+    private TraverserSet<Object> haltedTraversers;
     private boolean returnHaltedTraversers = false;
 
     private TraversalVertexProgram() {
@@ -116,6 +119,32 @@ public final class TraversalVertexProgram implements 
VertexProgram<TraverserSet<
         return this.traversal;
     }
 
+    public static <R> TraverserSet<R> loadHaltedTraversers(final Configuration 
configuration) {
+        if (!configuration.containsKey(HALTED_TRAVERSERS))
+            return new TraverserSet<>();
+
+        final Object object = configuration.getProperty(HALTED_TRAVERSERS) 
instanceof String ?
+                VertexProgramHelper.deserialize(configuration, 
HALTED_TRAVERSERS) :
+                configuration.getProperty(HALTED_TRAVERSERS);
+        if (object instanceof Traverser.Admin)
+            return new TraverserSet<>((Traverser.Admin<R>) object);
+        else {
+            final TraverserSet<R> traverserSet = new TraverserSet<>();
+            traverserSet.addAll((Collection) object);
+            return traverserSet;
+        }
+    }
+
+    public static <R> void storeHaltedTraversers(final Configuration 
configuration, final TraverserSet<R> haltedTraversers) {
+        if (null != haltedTraversers && !haltedTraversers.isEmpty()) {
+            try {
+                VertexProgramHelper.serialize(haltedTraversers, configuration, 
HALTED_TRAVERSERS);
+            } catch (final Exception e) {
+                configuration.setProperty(HALTED_TRAVERSERS, haltedTraversers);
+            }
+        }
+    }
+
     @Override
     public void loadState(final Graph graph, final Configuration 
configuration) {
         if (!configuration.containsKey(TRAVERSAL))
@@ -125,6 +154,8 @@ public final class TraversalVertexProgram implements 
VertexProgram<TraverserSet<
             this.traversal.get().applyStrategies();
         /// traversal is compiled and ready to be introspected
         this.traversalMatrix = new TraversalMatrix<>(this.traversal.get());
+        // get any master-traversal halted traversers
+        this.haltedTraversers = 
TraversalVertexProgram.loadHaltedTraversers(configuration);
         // if results will be serialized out, don't save halted traversers 
across the cluster
         this.returnHaltedTraversers =
                 (this.traversal.get().getParent().asStep().getNextStep() 
instanceof ComputerResultStep || // if its just going to stream it out, don't 
distribute
@@ -158,6 +189,7 @@ public final class TraversalVertexProgram implements 
VertexProgram<TraverserSet<
     public void storeState(final Configuration configuration) {
         VertexProgram.super.storeState(configuration);
         this.traversal.storeState(configuration, TRAVERSAL);
+        TraversalVertexProgram.storeHaltedTraversers(configuration, 
this.haltedTraversers);
     }
 
     @Override
@@ -170,19 +202,17 @@ public final class TraversalVertexProgram implements 
VertexProgram<TraverserSet<
         memory.set(MUTATED_MEMORY_KEYS, new HashSet<>());
         memory.set(COMPLETED_BARRIERS, new HashSet<>());
         // if halted traversers are being sent from a previous VertexProgram 
in an OLAP chain (non-distributed traversers), get them into the stream
-        if (sideEffects.exists(TraversalVertexProgram.HALTED_TRAVERSERS)) {
-            final TraverserSet<Object> haltedTraversers = 
sideEffects.get(TraversalVertexProgram.HALTED_TRAVERSERS);
+        if (null != this.haltedTraversers) {
             final TraverserSet<Object> toProcessTraversers = new 
TraverserSet<>();
-            
IteratorUtils.removeOnNext(haltedTraversers.iterator()).forEachRemaining(traverser
 -> {
+            
IteratorUtils.removeOnNext(this.haltedTraversers.iterator()).forEachRemaining(traverser
 -> {
                 
traverser.setStepId(this.traversal.get().getStartStep().getId());
                 toProcessTraversers.add(traverser);
             });
             assert haltedTraversers.isEmpty(); // TODO: this should be empty
             final TraverserSet<Object> remoteActiveTraversers = new 
TraverserSet<>();
-            MasterExecutor.processTraversers(this.traversal, 
this.traversalMatrix, toProcessTraversers, remoteActiveTraversers, 
haltedTraversers);
-            memory.set(HALTED_TRAVERSERS, haltedTraversers);
+            MasterExecutor.processTraversers(this.traversal, 
this.traversalMatrix, toProcessTraversers, remoteActiveTraversers, 
this.haltedTraversers);
+            memory.set(HALTED_TRAVERSERS, this.haltedTraversers);
             memory.set(ACTIVE_TRAVERSERS, remoteActiveTraversers);
-            sideEffects.remove(TraversalVertexProgram.HALTED_TRAVERSERS);
         } else {
             memory.set(HALTED_TRAVERSERS, new TraverserSet<>());
             memory.set(ACTIVE_TRAVERSERS, new TraverserSet<>());
@@ -196,6 +226,9 @@ public final class TraversalVertexProgram implements 
VertexProgram<TraverserSet<
 
     @Override
     public void execute(final Vertex vertex, final 
Messenger<TraverserSet<Object>> messenger, final Memory memory) {
+        // if any global halted traversers, simply don't use them as they were 
handled by master setup()
+        if (null != this.haltedTraversers)
+            this.haltedTraversers = null;
         // memory is distributed
         MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), 
memory, MemoryTraversalSideEffects.State.EXECUTE);
         // if a barrier was completed in another worker, it is also completed 
here (ensure distributed barriers are synchronized)
@@ -365,6 +398,11 @@ public final class TraversalVertexProgram implements 
VertexProgram<TraverserSet<
             super(TraversalVertexProgram.class);
         }
 
+        public Builder haltedTraversers(final TraverserSet<Object> 
haltedTraversers) {
+            TraversalVertexProgram.storeHaltedTraversers(this.configuration, 
haltedTraversers);
+            return this;
+        }
+
         public Builder traversal(final TraversalSource traversalSource, final 
String scriptEngine, final String traversalScript, final Object... bindings) {
             return this.traversal(new ScriptTraversal<>(traversalSource, 
scriptEngine, traversalScript, bindings));
         }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
index 167924f..f5facdb 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
@@ -87,9 +87,16 @@ public final class TraverserExecutor {
             while (traversers.hasNext()) {
                 final Traverser.Admin<Object> traverser = traversers.next();
                 traversers.remove();
-                traverser.attach(Attachable.Method.get(vertex));
-                traverser.setSideEffects(traversalSideEffects);
-                toProcessTraversers.add(traverser);
+                if (traverser.isHalted()) {
+                    if (returnHaltedTraversers)
+                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, 
new TraverserSet<>(traverser));
+                    else
+                        haltedTraversers.add(traverser);
+                } else {
+                    traverser.attach(Attachable.Method.get(vertex));
+                    traverser.setSideEffects(traversalSideEffects);
+                    toProcessTraversers.add(traverser);
+                }
             }
         }
 
@@ -152,10 +159,10 @@ public final class TraverserExecutor {
                     final TraverserSet<Object> barrierSet = 
barrier.nextBarrier();
                     
IteratorUtils.removeOnNext(barrierSet.iterator()).forEachRemaining(traverser -> 
{
                         traverser.addLabels(step.getLabels());  // this might 
need to be generalized for working with global barriers too
-                        if (traverser.isHalted()) {
+                        if (traverser.isHalted() && ((!(traverser.get() 
instanceof Element) && !(traverser.get() instanceof Property)) || 
getHostingVertex(traverser.get()).equals(vertex))) {
                             traverser.detach();
                             if (returnHaltedTraversers)
-                                
memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new 
TraverserSet<>(traverser.split()));
+                                
memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new 
TraverserSet<>(traverser));
                             else
                                 haltedTraversers.add(traverser);
                         } else {
@@ -174,10 +181,10 @@ public final class TraverserExecutor {
             }
         } else { // LOCAL PROCESSING
             step.forEachRemaining(traverser -> {
-                if (traverser.isHalted()) {
+                if (traverser.isHalted() && ((!(traverser.get() instanceof 
Element) && !(traverser.get() instanceof Property)) || 
getHostingVertex(traverser.get()).equals(vertex))) {
                     traverser.detach();
                     if (returnHaltedTraversers)
-                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, 
new TraverserSet<>(traverser.split()));
+                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, 
new TraverserSet<>(traverser));
                     else
                         haltedTraversers.add(traverser);
                 } else {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/VertexComputing.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/VertexComputing.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/VertexComputing.java
index 57e1a3e..b599c42 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/VertexComputing.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/VertexComputing.java
@@ -21,9 +21,12 @@ package 
org.apache.tinkerpop.gremlin.process.computer.traversal.step;
 
 import org.apache.tinkerpop.gremlin.process.computer.Computer;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 
+import java.util.Optional;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -48,10 +51,11 @@ public interface VertexComputing {
     /**
      * Generate the {@link VertexProgram}.
      *
-     * @param graph the {@link Graph} that the program will be executed over.
+     * @param graph          the {@link Graph} that the program will be 
executed over.
+     * @param memoryOptional a reference to a {@link Memory} from the previous 
OLAP job if it exists.
      * @return the generated vertex program instance.
      */
-    public VertexProgram generateProgram(final Graph graph);
+    public VertexProgram generateProgram(final Graph graph, final 
Optional<Memory> memoryOptional);
 
     /**
      * @deprecated As of release 3.2.1. Please use {@link 
VertexComputing#getComputer()}.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PageRankVertexProgramStep.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PageRankVertexProgramStep.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PageRankVertexProgramStep.java
index 5d10e67..660f5b8 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PageRankVertexProgramStep.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PageRankVertexProgramStep.java
@@ -19,8 +19,9 @@
 
 package org.apache.tinkerpop.gremlin.process.computer.traversal.step.map;
 
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import 
org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
+import 
org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
 import 
org.apache.tinkerpop.gremlin.process.computer.traversal.lambda.HaltedTraversersCountTraversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
@@ -37,8 +38,8 @@ import 
org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
-import java.util.function.Function;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -83,7 +84,7 @@ public final class PageRankVertexProgramStep extends 
VertexProgramStep implement
     }
 
     @Override
-    public PageRankVertexProgram generateProgram(final Graph graph) {
+    public PageRankVertexProgram generateProgram(final Graph graph, final 
Optional<Memory> memoryOptional) {
         final Traversal.Admin<Vertex, Edge> detachedTraversal = 
this.edgeTraversal.getPure();
         
detachedTraversal.setStrategies(TraversalStrategies.GlobalCache.getStrategies(graph.getClass()));
         final PageRankVertexProgram.Builder builder = 
PageRankVertexProgram.build()

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PeerPressureVertexProgramStep.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PeerPressureVertexProgramStep.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PeerPressureVertexProgramStep.java
index f42ce93..2ae36a5 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PeerPressureVertexProgramStep.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PeerPressureVertexProgramStep.java
@@ -19,7 +19,7 @@
 
 package org.apache.tinkerpop.gremlin.process.computer.traversal.step.map;
 
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import 
org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgram;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
@@ -36,6 +36,7 @@ import 
org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -84,9 +85,15 @@ public final class PeerPressureVertexProgramStep extends 
VertexProgramStep imple
     }
 
     @Override
-    public PeerPressureVertexProgram generateProgram(final Graph graph) {
+    public PeerPressureVertexProgram generateProgram(final Graph graph, final 
Optional<Memory> memoryOptional) {
         final Traversal.Admin<Vertex, Edge> detachedTraversal = 
this.edgeTraversal.getPure();
         
detachedTraversal.setStrategies(TraversalStrategies.GlobalCache.getStrategies(graph.getClass()));
+        /*
+                memoryOptional.ifPresent(memory -> {
+            if (memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS))
+                builder.configure(TraversalVertexProgram.HALTED_TRAVERSERS, 
memory.get(TraversalVertexProgram.HALTED_TRAVERSERS));
+        });
+         */
         return PeerPressureVertexProgram.build()
                 .property(this.clusterProperty)
                 .maxIterations(this.times)

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ProgramVertexProgramStep.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ProgramVertexProgramStep.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ProgramVertexProgramStep.java
index a1342b6..d060c99 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ProgramVertexProgramStep.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ProgramVertexProgramStep.java
@@ -20,7 +20,9 @@
 package org.apache.tinkerpop.gremlin.process.computer.traversal.step.map;
 
 import org.apache.commons.configuration.MapConfiguration;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import 
org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.util.PureTraversal;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
@@ -29,6 +31,7 @@ import 
org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -48,11 +51,16 @@ public final class ProgramVertexProgramStep extends 
VertexProgramStep {
     }
 
     @Override
-    public VertexProgram generateProgram(final Graph graph) {
+    public VertexProgram generateProgram(final Graph graph, final 
Optional<Memory> memoryOptional) {
         final MapConfiguration base = new MapConfiguration(this.configuration);
         base.setDelimiterParsingDisabled(true);
         PureTraversal.storeState(base, ROOT_TRAVERSAL, 
TraversalHelper.getRootTraversal(this.getTraversal()).clone());
         base.setProperty(STEP_ID, this.getId());
+        memoryOptional.ifPresent(memory -> {
+            if (memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS)) {
+                TraversalVertexProgram.storeHaltedTraversers(base, 
memory.get(TraversalVertexProgram.HALTED_TRAVERSERS));
+            }
+        });
         return VertexProgram.createVertexProgram(graph, base);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
index 64b9097..5851f35 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
@@ -20,6 +20,7 @@
 package org.apache.tinkerpop.gremlin.process.computer.traversal.step.map;
 
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import 
org.apache.tinkerpop.gremlin.process.computer.traversal.MemoryTraversalSideEffects;
 import 
org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
@@ -32,6 +33,7 @@ import 
org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -62,15 +64,18 @@ public final class TraversalVertexProgramStep extends 
VertexProgramStep implemen
     }
 
     @Override
-    public TraversalVertexProgram generateProgram(final Graph graph) {
+    public TraversalVertexProgram generateProgram(final Graph graph, final 
Optional<Memory> memoryOptional) {
         final Traversal.Admin<?, ?> computerSpecificTraversal = 
this.computerTraversal.getPure();
         
computerSpecificTraversal.setStrategies(TraversalStrategies.GlobalCache.getStrategies(graph.getClass()).clone());
         
this.getTraversal().getStrategies().toList().forEach(computerSpecificTraversal.getStrategies()::addStrategies);
         computerSpecificTraversal.setSideEffects(new 
MemoryTraversalSideEffects(this.getTraversal().getSideEffects()));
         computerSpecificTraversal.setParent(this);
-        return TraversalVertexProgram.build()
-                .traversal(computerSpecificTraversal)
-                .create(graph);
+        final TraversalVertexProgram.Builder builder = 
TraversalVertexProgram.build().traversal(computerSpecificTraversal);
+        memoryOptional.ifPresent(memory -> {
+            if (memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS))
+                
builder.haltedTraversers(memory.get(TraversalVertexProgram.HALTED_TRAVERSERS));
+        });
+        return builder.create(graph);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
index c535d30..ac3de30 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
@@ -25,7 +25,6 @@ import 
org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import 
org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
 import 
org.apache.tinkerpop.gremlin.process.computer.traversal.step.VertexComputing;
-import org.apache.tinkerpop.gremlin.process.traversal.Operator;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
@@ -33,12 +32,11 @@ import 
org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ProfileStep;
-import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import 
org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
 import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.util.function.ConstantSupplier;
 
 import java.util.NoSuchElementException;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -66,14 +64,15 @@ public abstract class VertexProgramStep extends 
AbstractStep<ComputerResult, Com
             if (this.first && this.getPreviousStep() instanceof EmptyStep) {
                 this.first = false;
                 final Graph graph = this.getTraversal().getGraph().get();
-                future = 
this.generateComputer(graph).program(this.generateProgram(graph)).submit();
+                future = 
this.generateComputer(graph).program(this.generateProgram(graph, 
Optional.empty())).submit();
                 final ComputerResult result = future.get();
                 this.processMemorySideEffects(result.memory());
                 return 
this.getTraversal().getTraverserGenerator().generate(result, this, 1l);
             } else {
                 final Traverser.Admin<ComputerResult> traverser = 
this.starts.next();
                 final Graph graph = traverser.get().graph();
-                future = 
this.generateComputer(graph).program(this.generateProgram(graph)).submit();
+                final Memory memory = traverser.get().memory();
+                future = 
this.generateComputer(graph).program(this.generateProgram(graph, 
Optional.of(memory))).submit();
                 final ComputerResult result = future.get();
                 this.processMemorySideEffects(result.memory());
                 return traverser.split(result, this);
@@ -122,21 +121,13 @@ public abstract class VertexProgramStep extends 
AbstractStep<ComputerResult, Com
         final TraversalSideEffects sideEffects = 
this.getTraversal().getSideEffects();
         for (final String key : memory.keys()) {
             if (sideEffects.exists(key)) {
+                // halted traversers should never be propagated through 
sideEffects
+                assert !key.equals(TraversalVertexProgram.HALTED_TRAVERSERS);
                 sideEffects.set(key, memory.get(key));
             }
         }
-        if (memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS) && 
!this.isEndStep()) {
-            final TraverserSet<Object> haltedTraversers = 
memory.get(TraversalVertexProgram.HALTED_TRAVERSERS);
-            if (!haltedTraversers.isEmpty()) {
-                if 
(sideEffects.exists(TraversalVertexProgram.HALTED_TRAVERSERS))
-                    sideEffects.set(TraversalVertexProgram.HALTED_TRAVERSERS, 
haltedTraversers);
-                else
-                    
sideEffects.register(TraversalVertexProgram.HALTED_TRAVERSERS, new 
ConstantSupplier<>(haltedTraversers), Operator.addAll);
-            }
-        }
     }
 
-
     protected boolean isEndStep() {
         return this.getNextStep() instanceof ComputerResultStep || 
(this.getNextStep() instanceof ProfileStep && this.getNextStep().getNextStep() 
instanceof ComputerResultStep);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/GraphFilterStrategy.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/GraphFilterStrategy.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/GraphFilterStrategy.java
index 90819d7..e4cf5ff 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/GraphFilterStrategy.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/GraphFilterStrategy.java
@@ -41,6 +41,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -59,7 +60,7 @@ public final class GraphFilterStrategy extends 
AbstractTraversalStrategy<Travers
             return;
         final Graph graph = 
traversal.getGraph().orElse(EmptyGraph.instance()); // given that this strategy 
only works for single OLAP jobs, the graph is the traversal graph
         for (final TraversalVertexProgramStep step : 
TraversalHelper.getStepsOfClass(TraversalVertexProgramStep.class, traversal)) { 
  // will be zero or one step
-            final Traversal.Admin<?, ?> computerTraversal = 
step.generateProgram(graph).getTraversal().get().clone();
+            final Traversal.Admin<?, ?> computerTraversal = 
step.generateProgram(graph, Optional.empty()).getTraversal().get().clone();
             if (!computerTraversal.isLocked())
                 computerTraversal.applyStrategies();
             final Computer computer = step.getComputer();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ImmutablePath.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ImmutablePath.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ImmutablePath.java
index 1a916ca..1e936c8 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ImmutablePath.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ImmutablePath.java
@@ -20,7 +20,6 @@ package 
org.apache.tinkerpop.gremlin.process.traversal.step.util;
 
 import org.apache.tinkerpop.gremlin.process.traversal.Path;
 import org.apache.tinkerpop.gremlin.process.traversal.Pop;
-import org.apache.tinkerpop.gremlin.structure.Element;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -177,7 +176,7 @@ public class ImmutablePath implements Path, 
ImmutablePathImpl, Serializable, Clo
     }
 
 
-    private static class TailPath implements Path, ImmutablePathImpl {
+    private static class TailPath implements Path, ImmutablePathImpl, 
Serializable {
         private static final TailPath INSTANCE = new TailPath();
 
         private TailPath() {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyPageRankTest.groovy
----------------------------------------------------------------------
diff --git 
a/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyPageRankTest.groovy
 
b/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyPageRankTest.groovy
index bda6819..e27e8c0 100644
--- 
a/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyPageRankTest.groovy
+++ 
b/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyPageRankTest.groovy
@@ -69,5 +69,10 @@ public abstract class GroovyPageRankTest {
         public Traversal<Vertex, Map<Object, List<Vertex>>> 
get_g_V_outXcreatedX_groupXmX_byXlabelX_pageRankX1X_byXpageRankX_byXinEX_timesX1X_inXcreatedX_groupXmX_byXpageRankX_capXmX()
 {
             new ScriptTraversal<>(g, "gremlin-groovy", 
"g.V.out('created').group('m').by(label).pageRank(1.0).by('pageRank').by(inE()).times(1).in('created').group('m').by('pageRank').cap('m')")
         }
+
+        @Override
+        public Traversal<Vertex, Map<String, Object>> 
get_g_V_outXcreatedX_pageRank_byXbothEX_byXprojectRankX_valueMapXname_projectRankX()
 {
+            new ScriptTraversal<>(g, "gremlin-groovy", 
"g.V.out('created').pageRank().by(bothE()).by('projectRank').valueMap('name','projectRank')")
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PageRankTest.java
----------------------------------------------------------------------
diff --git 
a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PageRankTest.java
 
b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PageRankTest.java
index df4b802..5a3ed9b 100644
--- 
a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PageRankTest.java
+++ 
b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PageRankTest.java
@@ -31,6 +31,7 @@ import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -48,6 +49,8 @@ public abstract class PageRankTest extends 
AbstractGremlinProcessTest {
 
     public abstract Traversal<Vertex, Vertex> get_g_V_pageRank();
 
+    public abstract Traversal<Vertex, Map<String, Object>> 
get_g_V_outXcreatedX_pageRank_byXbothEX_byXprojectRankX_valueMapXname_projectRankX();
+
     public abstract Traversal<Vertex, String> 
get_g_V_pageRank_order_byXpageRank_decrX_name();
 
     public abstract Traversal<Vertex, String> 
get_g_V_pageRank_order_byXpageRank_decrX_name_limitX2X();
@@ -78,6 +81,20 @@ public abstract class PageRankTest extends 
AbstractGremlinProcessTest {
 
     @Test
     @LoadGraphWith(MODERN)
+    public void 
g_V_outXcreatedX_pageRank_byXbothEX_byXprojectRankX_valueMapXname_projectRankX()
 {
+        final Traversal<Vertex, Map<String, Object>> traversal = 
get_g_V_outXcreatedX_pageRank_byXbothEX_byXprojectRankX_valueMapXname_projectRankX();
+        printTraversalForm(traversal);
+        final Map<String, Double> map = new HashMap<>();
+        traversal.forEachRemaining(m -> map.put(((List<String>) 
m.get("name")).get(0), ((List<Double>) m.get("projectRank")).get(0)));
+        assertEquals(2, map.size());
+        assertTrue(map.containsKey("lop"));
+        assertTrue(map.containsKey("ripple"));
+        assertTrue(map.get("lop") > map.get("ripple"));
+        assertFalse(traversal.hasNext());
+    }
+
+    @Test
+    @LoadGraphWith(MODERN)
     public void g_V_pageRank_order_byXpageRank_decrX_name() {
         final Traversal<Vertex, String> traversal = 
get_g_V_pageRank_order_byXpageRank_decrX_name();
         printTraversalForm(traversal);
@@ -225,6 +242,11 @@ public abstract class PageRankTest extends 
AbstractGremlinProcessTest {
         }
 
         @Override
+        public Traversal<Vertex, Map<String, Object>> 
get_g_V_outXcreatedX_pageRank_byXbothEX_byXprojectRankX_valueMapXname_projectRankX()
 {
+            return 
g.V().out("created").pageRank().by(__.bothE()).by("projectRank").valueMap("name",
 "projectRank");
+        }
+
+        @Override
         public Traversal<Vertex, Map<String, List<Object>>> 
get_g_V_pageRank_byXoutEXknowsXX_byXfriendRankX_valueMapXname_friendRankX() {
             return 
g.V().pageRank().by(__.outE("knows")).by("friendRank").valueMap("name", 
"friendRank");
         }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java
----------------------------------------------------------------------
diff --git 
a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java
 
b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java
index 2c505e7..8b2a293 100644
--- 
a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java
+++ 
b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java
@@ -59,6 +59,7 @@ import java.util.Set;
 import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.MODERN;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -160,6 +161,7 @@ public abstract class ProgramTest extends 
AbstractGremlinProcessTest {
     public static class TestProgram implements VertexProgram {
 
         private PureTraversal<?, ?> traversal = new 
PureTraversal<>(EmptyTraversal.instance());
+        private TraverserSet<Object> haltedTraversers;
         private Step programStep = EmptyStep.instance();
 
         private final Set<MemoryComputeKey> memoryComputeKeys = new 
HashSet<>();
@@ -168,6 +170,7 @@ public abstract class ProgramTest extends 
AbstractGremlinProcessTest {
         public void loadState(final Graph graph, final Configuration 
configuration) {
             VertexProgram.super.loadState(graph, configuration);
             this.traversal = PureTraversal.loadState(configuration, 
VertexProgramStep.ROOT_TRAVERSAL, graph);
+            this.haltedTraversers = 
TraversalVertexProgram.loadHaltedTraversers(configuration);
             this.programStep = new 
TraversalMatrix<>(this.traversal.get()).getStepById(configuration.getString(ProgramVertexProgramStep.STEP_ID));
             
this.memoryComputeKeys.addAll(MemoryTraversalSideEffects.getMemoryComputeKeys(this.traversal.get()));
             
this.memoryComputeKeys.add(MemoryComputeKey.of(TraversalVertexProgram.HALTED_TRAVERSERS,
 Operator.addAll, false, false));
@@ -178,24 +181,24 @@ public abstract class ProgramTest extends 
AbstractGremlinProcessTest {
         public void storeState(final Configuration configuration) {
             VertexProgram.super.storeState(configuration);
             this.traversal.storeState(configuration, 
VertexProgramStep.ROOT_TRAVERSAL);
+            TraversalVertexProgram.storeHaltedTraversers(configuration, 
this.haltedTraversers);
             configuration.setProperty(ProgramVertexProgramStep.STEP_ID, 
this.programStep.getId());
         }
 
         @Override
         public void setup(final Memory memory) {
             
MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, 
MemoryTraversalSideEffects.State.SETUP);
-            final TraversalSideEffects sideEffects = 
this.traversal.get().getSideEffects();
-            final TraverserSet<Object> haltedTraversers = 
sideEffects.get(TraversalVertexProgram.HALTED_TRAVERSERS);
-            sideEffects.remove(TraversalVertexProgram.HALTED_TRAVERSERS);
-            this.checkSideEffects(MemoryTraversalSideEffects.State.SETUP);
-            final Map<Vertex, Long> map = (Map<Vertex, Long>) 
haltedTraversers.iterator().next().get();
+            final Map<Vertex, Long> map = (Map<Vertex, Long>) 
this.haltedTraversers.iterator().next().get();
             assertEquals(2, map.size());
             assertTrue(map.values().contains(3l));
             assertTrue(map.values().contains(1l));
             final TraverserSet<Object> activeTraversers = new TraverserSet<>();
-            map.keySet().forEach(vertex -> 
activeTraversers.add(haltedTraversers.peek().split(vertex, 
EmptyStep.instance())));
+            map.keySet().forEach(vertex -> 
activeTraversers.add(this.haltedTraversers.peek().split(vertex, 
EmptyStep.instance())));
+            this.haltedTraversers.clear();
+            this.checkSideEffects();
             memory.set(TraversalVertexProgram.ACTIVE_TRAVERSERS, 
activeTraversers);
 
+
         }
 
         @Override
@@ -203,7 +206,7 @@ public abstract class ProgramTest extends 
AbstractGremlinProcessTest {
             
assertFalse(memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS));
             final TraverserGenerator generator = 
this.traversal.get().getTraverserGenerator();
             
MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, 
MemoryTraversalSideEffects.State.EXECUTE);
-            this.checkSideEffects(MemoryTraversalSideEffects.State.EXECUTE);
+            this.checkSideEffects();
             final TraverserSet<Vertex> activeTraversers = 
memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS);
             if (vertex.label().equals("software")) {
                 assertEquals(1, activeTraversers.stream().filter(v -> 
v.get().equals(vertex)).count());
@@ -231,7 +234,7 @@ public abstract class ProgramTest extends 
AbstractGremlinProcessTest {
         public boolean terminate(final Memory memory) {
             final TraverserGenerator generator = 
this.traversal.get().getTraverserGenerator();
             
MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, 
MemoryTraversalSideEffects.State.TERMINATE);
-            checkSideEffects(MemoryTraversalSideEffects.State.TERMINATE);
+            checkSideEffects();
             if (memory.isInitialIteration()) {
                 
assertFalse(memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS));
                 return false;
@@ -248,16 +251,18 @@ public abstract class ProgramTest extends 
AbstractGremlinProcessTest {
 
         @Override
         public void workerIterationStart(final Memory memory) {
+            assertNotNull(this.haltedTraversers);
+            this.haltedTraversers.clear();
             
assertFalse(memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS));
             
MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, 
MemoryTraversalSideEffects.State.WORKER_ITERATION_START);
-            
checkSideEffects(MemoryTraversalSideEffects.State.WORKER_ITERATION_START);
+            checkSideEffects();
         }
 
         @Override
         public void workerIterationEnd(final Memory memory) {
             
assertFalse(memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS));
             
MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, 
MemoryTraversalSideEffects.State.WORKER_ITERATION_END);
-            
checkSideEffects(MemoryTraversalSideEffects.State.WORKER_ITERATION_END);
+            checkSideEffects();
         }
 
         @Override
@@ -299,16 +304,13 @@ public abstract class ProgramTest extends 
AbstractGremlinProcessTest {
 
         ////////
 
-        private void checkSideEffects(final MemoryTraversalSideEffects.State 
state) {
+        private void checkSideEffects() {
+            assertEquals(0, this.haltedTraversers.size());
+            assertTrue(this.haltedTraversers.isEmpty());
             final TraversalSideEffects sideEffects = 
this.traversal.get().getSideEffects();
             assertTrue(sideEffects instanceof MemoryTraversalSideEffects);
-            if (state.masterState()) {
-                assertEquals(1, sideEffects.keys().size());
-                
assertFalse(sideEffects.exists(TraversalVertexProgram.HALTED_TRAVERSERS));
-            } else {
-                assertEquals(2, sideEffects.keys().size());
-                
assertTrue(sideEffects.exists(TraversalVertexProgram.HALTED_TRAVERSERS));
-            }
+            assertEquals(1, sideEffects.keys().size());
+            
assertFalse(sideEffects.exists(TraversalVertexProgram.HALTED_TRAVERSERS));
             assertTrue(sideEffects.exists("x"));
             final BulkSet<String> bulkSet = sideEffects.get("x");
             assertEquals(4, bulkSet.size());

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 9a9e934..9e05e53 100644
--- 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -264,14 +264,13 @@ public final class SparkGraphComputer extends 
AbstractHadoopGraphComputer {
                             throw new IllegalStateException(e.getMessage());
                         }
                     } else {  // standard GraphComputer semantics
+                        // get a configuration that will be propagated to all 
workers
+                        final HadoopConfiguration vertexProgramConfiguration = 
new HadoopConfiguration();
+                        
this.vertexProgram.storeState(vertexProgramConfiguration);
                         // set up the vertex program and wire up configurations
                         this.vertexProgram.setup(memory);
                         JavaPairRDD<Object, ViewIncomingPayload<Object>> 
viewIncomingRDD = null;
                         memory.broadcastMemory(sparkContext);
-                        final HadoopConfiguration vertexProgramConfiguration = 
new HadoopConfiguration();
-                        
this.vertexProgram.storeState(vertexProgramConfiguration);
-                        ConfigurationUtils.copy(vertexProgramConfiguration, 
apacheConfiguration);
-                        
ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, 
hadoopConfiguration);
                         // execute the vertex program
                         while (true) {
                             if (Thread.interrupted()) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
index 19d21b3..a2cc518 100644
--- 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
@@ -20,6 +20,7 @@
 package 
org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization;
 
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import 
org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
@@ -29,6 +30,8 @@ import 
org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.op
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
 
+import java.util.Optional;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -43,7 +46,7 @@ public final class SparkInterceptorStrategy extends 
AbstractTraversalStrategy<Tr
     public void apply(final Traversal.Admin<?, ?> traversal) {
         final Graph graph = 
traversal.getGraph().orElse(EmptyGraph.instance()); // best guess at what the 
graph will be as its dynamically determined
         for (final TraversalVertexProgramStep step : 
TraversalHelper.getStepsOfClass(TraversalVertexProgramStep.class, traversal)) {
-            final Traversal.Admin<?, ?> computerTraversal = 
step.generateProgram(graph).getTraversal().get().clone();
+            final Traversal.Admin<?, ?> computerTraversal = 
step.generateProgram(graph, Optional.empty()).getTraversal().get().clone();
             if (!computerTraversal.isLocked())
                 computerTraversal.applyStrategies();
             if (SparkStarBarrierInterceptor.isLegal(computerTraversal)) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
index 153282e..280c7ba 100644
--- 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
@@ -20,6 +20,7 @@
 package 
org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization;
 
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import 
org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
 import org.apache.tinkerpop.gremlin.process.traversal.Scope;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
@@ -36,6 +37,7 @@ import 
org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
 
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -59,7 +61,7 @@ public final class SparkSingleIterationStrategy extends 
AbstractTraversalStrateg
     public void apply(final Traversal.Admin<?, ?> traversal) {
         final Graph graph = 
traversal.getGraph().orElse(EmptyGraph.instance()); // best guess at what the 
graph will be as its dynamically determined
         for (final TraversalVertexProgramStep step : 
TraversalHelper.getStepsOfClass(TraversalVertexProgramStep.class, traversal)) {
-            final Traversal.Admin<?, ?> computerTraversal = 
step.generateProgram(graph).getTraversal().get().clone();
+            final Traversal.Admin<?, ?> computerTraversal = 
step.generateProgram(graph, Optional.empty()).getTraversal().get().clone();
             if (!computerTraversal.isLocked())
                 computerTraversal.applyStrategies();
             boolean doesMessagePass = 
TraversalHelper.hasStepOfAssignableClassRecursively(Scope.global, 
MULTI_ITERATION_CLASSES, computerTraversal);

Reply via email to