Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1310 [created] b72ea666c


TraversalVertexProgram can use DetachFactory or ReferenceFactory when sending 
halted traversers to the master traversal. The default is to use 
ReferenceFactory (more efficient), but if someone wants to use DeatchedFactory, 
then they do 
g.withComputer().withStrategies(HaltedTraverserFactoryStrategy.detached()). 
Easy peasy lemon squeezy. Still need to write specific test cases, but manual 
testing shows things workings.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/b72ea666
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/b72ea666
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/b72ea666

Branch: refs/heads/TINKERPOP-1310
Commit: b72ea666ce18920a8dd7d1d4c30bbe029fe4ae80
Parents: afd4048
Author: Marko A. Rodriguez <okramma...@gmail.com>
Authored: Tue May 24 16:50:19 2016 -0600
Committer: Marko A. Rodriguez <okramma...@gmail.com>
Committed: Tue May 24 16:50:19 2016 -0600

----------------------------------------------------------------------
 .../computer/traversal/MasterExecutor.java      | 32 +++++++-----
 .../traversal/TraversalVertexProgram.java       | 30 +++++++----
 .../computer/traversal/WorkerExecutor.java      | 44 +++++++++-------
 .../step/map/TraversalVertexProgramStep.java    | 10 +++-
 .../HaltedTraverserFactoryStrategy.java         | 53 ++++++++++++++++++++
 5 files changed, 126 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b72ea666/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
index 88570fe..b5ec12b 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
@@ -42,6 +42,7 @@ import 
org.apache.tinkerpop.gremlin.process.traversal.util.PureTraversal;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
 import org.apache.tinkerpop.gremlin.structure.util.Attachable;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
+import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.util.HashSet;
@@ -57,6 +58,16 @@ final class MasterExecutor {
 
     }
 
+    protected static <R> Traverser.Admin<R> detach(final Traverser.Admin<R> 
traverser, final Class haltedTraverserFactory) {
+        if (haltedTraverserFactory.equals(DetachedFactory.class))
+            traverser.set(DetachedFactory.detach(traverser.get(), true));
+        else if (haltedTraverserFactory.equals(ReferenceFactory.class))
+            traverser.set(ReferenceFactory.detach(traverser.get()));
+        else
+            throw new IllegalArgumentException("The following detaching 
factory is unknown: " + haltedTraverserFactory);
+        return traverser;
+    }
+
     protected static void processMemory(final TraversalMatrix<?, ?> 
traversalMatrix, final Memory memory, final TraverserSet<Object> traverserSet, 
final Set<String> completedBarriers) {
         if (memory.exists(TraversalVertexProgram.MUTATED_MEMORY_KEYS)) {
             for (final String key : 
memory.<Set<String>>get(TraversalVertexProgram.MUTATED_MEMORY_KEYS)) {
@@ -82,7 +93,8 @@ final class MasterExecutor {
                                             final TraversalMatrix<?, ?> 
traversalMatrix,
                                             TraverserSet<Object> 
toProcessTraversers,
                                             final TraverserSet<Object> 
remoteActiveTraversers,
-                                            final TraverserSet<Object> 
haltedTraversers) {
+                                            final TraverserSet<Object> 
haltedTraversers,
+                                            final Class 
haltedTraverserFactory) {
 
 
         while (!toProcessTraversers.isEmpty()) {
@@ -96,23 +108,19 @@ final class MasterExecutor {
                 traverser.set(DetachedFactory.detach(traverser.get(), true));
                 traverser.setSideEffects(traversal.get().getSideEffects());
                 if (traverser.isHalted()) {
-                    traverser.detach();
-                    haltedTraversers.add(traverser);
+                    haltedTraversers.add(MasterExecutor.detach(traverser, 
haltedTraverserFactory));
                 } else if (isRemoteTraverser(traverser, traversalMatrix)) {  
// this is so that patterns like order().name work as expected.
-                    traverser.detach();
-                    remoteActiveTraversers.add(traverser);
+                    remoteActiveTraversers.add(traverser.detach());
                 } else {
                     currentStep = 
traversalMatrix.getStepById(traverser.getStepId());
                     if (!currentStep.getId().equals(previousStep.getId()) && 
!(previousStep instanceof EmptyStep)) {
                         while (previousStep.hasNext()) {
                             final Traverser.Admin<Object> result = 
previousStep.next();
                             if (result.isHalted()) {
-                                result.detach();
-                                haltedTraversers.add(result);
+                                
haltedTraversers.add(MasterExecutor.detach(result, haltedTraverserFactory));
                             } else {
                                 if (isRemoteTraverser(result, 
traversalMatrix)) {
-                                    result.detach();
-                                    remoteActiveTraversers.add(result);
+                                    
remoteActiveTraversers.add(result.detach());
                                 } else
                                     localActiveTraversers.add(result);
                             }
@@ -126,12 +134,10 @@ final class MasterExecutor {
                 while (currentStep.hasNext()) {
                     final Traverser.Admin<Object> traverser = 
currentStep.next();
                     if (traverser.isHalted()) {
-                        traverser.detach();
-                        haltedTraversers.add(traverser);
+                        haltedTraversers.add(MasterExecutor.detach(traverser, 
haltedTraverserFactory));
                     } else {
                         if (isRemoteTraverser(traverser, traversalMatrix)) {
-                            traverser.detach();
-                            remoteActiveTraversers.add(traverser);
+                            remoteActiveTraversers.add(traverser.detach());
                         } else
                             localActiveTraversers.add(traverser);
                     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b72ea666/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index 266426f..d4daaac 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -64,6 +64,7 @@ import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
 import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory;
 import org.apache.tinkerpop.gremlin.util.function.MutableMetricsSupplier;
 import org.apache.tinkerpop.gremlin.util.iterator.EmptyIterator;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
@@ -93,6 +94,7 @@ public final class TraversalVertexProgram implements 
VertexProgram<TraverserSet<
     public static final String TRAVERSAL = 
"gremlin.traversalVertexProgram.traversal";
     public static final String HALTED_TRAVERSERS = 
"gremlin.traversalVertexProgram.haltedTraversers";
     public static final String ACTIVE_TRAVERSERS = 
"gremlin.traversalVertexProgram.activeTraversers";
+    public static final String HALTED_TRAVERSER_FACTORY = 
"gremlin.traversalVertexProgram.haltedTraverserFactory";
     protected static final String MUTATED_MEMORY_KEYS = 
"gremlin.traversalVertexProgram.mutatedMemoryKeys";
     private static final String VOTE_TO_HALT = 
"gremlin.traversalVertexProgram.voteToHalt";
     private static final String COMPLETED_BARRIERS = 
"gremlin.traversalVertexProgram.completedBarriers";
@@ -108,6 +110,7 @@ public final class TraversalVertexProgram implements 
VertexProgram<TraverserSet<
     private final Set<MapReduce> mapReducers = new HashSet<>();
     private TraverserSet<Object> haltedTraversers;
     private boolean returnHaltedTraversers = false;
+    private Class haltedTraverserDetachFactory;
 
     private TraversalVertexProgram() {
     }
@@ -164,6 +167,8 @@ public final class TraversalVertexProgram implements 
VertexProgram<TraverserSet<
                         
this.traversal.get().getParent().asStep().getNextStep() instanceof EmptyStep || 
 // same as above, but if using TraversalVertexProgramStep directly
                         
(this.traversal.get().getParent().asStep().getNextStep() instanceof ProfileStep 
&& // same as above, but needed for profiling
                                 
this.traversal.get().getParent().asStep().getNextStep().getNextStep() 
instanceof ComputerResultStep));
+        // determine how to store halted traversers
+        this.haltedTraverserDetachFactory = 
configuration.containsKey(HALTED_TRAVERSER_FACTORY) ? (Class) 
configuration.getProperty(HALTED_TRAVERSER_FACTORY) : ReferenceFactory.class;
         // register traversal side-effects in memory
         
this.memoryComputeKeys.addAll(MemoryTraversalSideEffects.getMemoryComputeKeys(this.traversal.get()));
         // register MapReducer memory compute keys
@@ -191,6 +196,7 @@ public final class TraversalVertexProgram implements 
VertexProgram<TraverserSet<
     public void storeState(final Configuration configuration) {
         VertexProgram.super.storeState(configuration);
         this.traversal.storeState(configuration, TRAVERSAL);
+        configuration.setProperty(HALTED_TRAVERSER_FACTORY, 
this.haltedTraverserDetachFactory);
         TraversalVertexProgram.storeHaltedTraversers(configuration, 
this.haltedTraversers);
     }
 
@@ -211,7 +217,7 @@ public final class TraversalVertexProgram implements 
VertexProgram<TraverserSet<
             });
             assert haltedTraversers.isEmpty();
             final TraverserSet<Object> remoteActiveTraversers = new 
TraverserSet<>();
-            MasterExecutor.processTraversers(this.traversal, 
this.traversalMatrix, toProcessTraversers, remoteActiveTraversers, 
this.haltedTraversers);
+            MasterExecutor.processTraversers(this.traversal, 
this.traversalMatrix, toProcessTraversers, remoteActiveTraversers, 
this.haltedTraversers, this.haltedTraverserDetachFactory);
             memory.set(HALTED_TRAVERSERS, this.haltedTraversers);
             memory.set(ACTIVE_TRAVERSERS, remoteActiveTraversers);
         } else {
@@ -265,18 +271,17 @@ public final class TraversalVertexProgram implements 
VertexProgram<TraverserSet<
                     graphStep.setIteratorSupplier(() -> (Iterator) 
IteratorUtils.filter(vertex.edges(Direction.OUT), edge -> 
ElementHelper.idExists(edge.id(), graphStep.getIds())));
                 graphStep.forEachRemaining(traverser -> {
                     if (traverser.isHalted()) {
-                        traverser.detach();
                         if (this.returnHaltedTraversers)
-                            memory.add(HALTED_TRAVERSERS, new 
TraverserSet<>(traverser));
+                            memory.add(HALTED_TRAVERSERS, new 
TraverserSet<>(MasterExecutor.detach(traverser, 
this.haltedTraverserDetachFactory)));
                         else
-                            haltedTraversers.add((Traverser.Admin) traverser);
+                            haltedTraversers.add((Traverser.Admin) 
traverser.detach());
                     } else
                         activeTraversers.add((Traverser.Admin) traverser);
                 });
             }
-            memory.add(VOTE_TO_HALT, activeTraversers.isEmpty() || 
WorkerExecutor.execute(vertex, new SingleMessenger<>(messenger, 
activeTraversers), this.traversalMatrix, memory, this.returnHaltedTraversers));
+            memory.add(VOTE_TO_HALT, activeTraversers.isEmpty() || 
WorkerExecutor.execute(vertex, new SingleMessenger<>(messenger, 
activeTraversers), this.traversalMatrix, memory, this.returnHaltedTraversers, 
this.haltedTraverserDetachFactory));
         } else {  // ITERATION 1+
-            memory.add(VOTE_TO_HALT, WorkerExecutor.execute(vertex, messenger, 
this.traversalMatrix, memory, this.returnHaltedTraversers));
+            memory.add(VOTE_TO_HALT, WorkerExecutor.execute(vertex, messenger, 
this.traversalMatrix, memory, this.returnHaltedTraversers, 
this.haltedTraverserDetachFactory));
         }
         if (this.returnHaltedTraversers || haltedTraversers.isEmpty())
             vertex.<TraverserSet>property(HALTED_TRAVERSERS).remove();
@@ -298,7 +303,7 @@ public final class TraversalVertexProgram implements 
VertexProgram<TraverserSet<
             final Set<String> completedBarriers = new HashSet<>();
             MasterExecutor.processMemory(this.traversalMatrix, memory, 
toProcessTraversers, completedBarriers);
             // process all results from barriers locally and when elements are 
touched, put them in remoteActiveTraversers
-            MasterExecutor.processTraversers(this.traversal, 
this.traversalMatrix, toProcessTraversers, remoteActiveTraversers, 
haltedTraversers);
+            MasterExecutor.processTraversers(this.traversal, 
this.traversalMatrix, toProcessTraversers, remoteActiveTraversers, 
haltedTraversers, this.haltedTraverserDetachFactory);
             // tell parallel barriers that might not have been active in the 
last round that they are no longer active
             memory.set(COMPLETED_BARRIERS, completedBarriers);
             if (!remoteActiveTraversers.isEmpty() ||
@@ -308,11 +313,9 @@ public final class TraversalVertexProgram implements 
VertexProgram<TraverserSet<
                 return false;
             } else {
                 // finalize locally with any last traversers dangling in the 
local traversal
-                final Step<?, ?> endStep = this.traversal.get().getEndStep();
+                final Step<?, Object> endStep = (Step<?, Object>) 
this.traversal.get().getEndStep();
                 while (endStep.hasNext()) {
-                    final Traverser.Admin traverser = endStep.next();
-                    traverser.detach();
-                    haltedTraversers.add(traverser);
+                    haltedTraversers.add(MasterExecutor.detach(endStep.next(), 
this.haltedTraverserDetachFactory));
                 }
                 // the result of a TraversalVertexProgram are the halted 
traversers
                 memory.set(HALTED_TRAVERSERS, haltedTraversers);
@@ -409,6 +412,11 @@ public final class TraversalVertexProgram implements 
VertexProgram<TraverserSet<
             return this;
         }
 
+        public Builder haltedTraverserFactory(final Class detachFactory) {
+            this.configuration.setProperty(HALTED_TRAVERSER_FACTORY, 
detachFactory);
+            return this;
+        }
+
         public Builder traversal(final TraversalSource traversalSource, final 
String scriptEngine, final String traversalScript, final Object... bindings) {
             return this.traversal(new ScriptTraversal<>(traversalSource, 
scriptEngine, traversalScript, bindings));
         }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b72ea666/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
index c4bd659..5bc3da9 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
@@ -35,6 +35,7 @@ import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Property;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.Attachable;
+import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.util.Collections;
@@ -51,7 +52,12 @@ final class WorkerExecutor {
 
     }
 
-    protected static boolean execute(final Vertex vertex, final 
Messenger<TraverserSet<Object>> messenger, final TraversalMatrix<?, ?> 
traversalMatrix, final Memory memory, final boolean returnHaltedTraversers) {
+    protected static boolean execute(final Vertex vertex,
+                                     final Messenger<TraverserSet<Object>> 
messenger,
+                                     final TraversalMatrix<?, ?> 
traversalMatrix,
+                                     final Memory memory,
+                                     final boolean returnHaltedTraversers,
+                                     final Class haltedTraverserFactory) {
 
         final TraversalSideEffects traversalSideEffects = 
traversalMatrix.getTraversal().getSideEffects();
         final AtomicBoolean voteToHalt = new AtomicBoolean(true);
@@ -93,7 +99,7 @@ final class WorkerExecutor {
                 traversers.remove();
                 if (traverser.isHalted()) {
                     if (returnHaltedTraversers)
-                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, 
new TraverserSet<>(traverser));
+                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, 
new TraverserSet<>(MasterExecutor.detach(traverser, haltedTraverserFactory)));
                     else
                         haltedTraversers.add(traverser);
                 } else {
@@ -118,11 +124,11 @@ final class WorkerExecutor {
                 traversers.remove();
                 final Step<Object, Object> currentStep = 
traversalMatrix.getStepById(traverser.getStepId());
                 if (!currentStep.getId().equals(previousStep.getId()) && 
!(previousStep instanceof EmptyStep))
-                    WorkerExecutor.drainStep(vertex, previousStep, 
activeTraversers, haltedTraversers, memory, returnHaltedTraversers);
+                    WorkerExecutor.drainStep(vertex, previousStep, 
activeTraversers, haltedTraversers, memory, returnHaltedTraversers, 
haltedTraverserFactory);
                 currentStep.addStart(traverser);
                 previousStep = currentStep;
             }
-            WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, 
haltedTraversers, memory, returnHaltedTraversers);
+            WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, 
haltedTraversers, memory, returnHaltedTraversers, haltedTraverserFactory);
             assert toProcessTraversers.isEmpty();
             // process all the local objects and send messages or store 
locally again
             if (!activeTraversers.isEmpty()) {
@@ -135,8 +141,7 @@ final class WorkerExecutor {
                         final Vertex hostingVertex = 
WorkerExecutor.getHostingVertex(traverser.get());
                         if (!vertex.equals(hostingVertex)) { // necessary for 
path access
                             voteToHalt.set(false);
-                            traverser.detach();
-                            
messenger.sendMessage(MessageScope.Global.of(hostingVertex), new 
TraverserSet<>(traverser));
+                            
messenger.sendMessage(MessageScope.Global.of(hostingVertex), new 
TraverserSet<>(traverser.detach()));
                         } else {
                             if (traverser.get() instanceof Attachable)   // 
necessary for path access to local object
                                 
traverser.attach(Attachable.Method.get(vertex));
@@ -151,7 +156,13 @@ final class WorkerExecutor {
         return voteToHalt.get();
     }
 
-    private static void drainStep(final Vertex vertex, final Step<Object, 
Object> step, final TraverserSet<Object> activeTraversers, final 
TraverserSet<Object> haltedTraversers, final Memory memory, final boolean 
returnHaltedTraversers) {
+    private static void drainStep(final Vertex vertex,
+                                  final Step<Object, Object> step,
+                                  final TraverserSet<Object> activeTraversers,
+                                  final TraverserSet<Object> haltedTraversers,
+                                  final Memory memory,
+                                  final boolean returnHaltedTraversers,
+                                  final Class haltedTraverserFactory) {
         if (step instanceof Barrier) {
             if (step instanceof Bypassing)
                 ((Bypassing) step).setBypass(true);
@@ -167,15 +178,12 @@ final class WorkerExecutor {
                                 (returnHaltedTraversers ||
                                         (!(traverser.get() instanceof Element) 
&& !(traverser.get() instanceof Property)) ||
                                         
getHostingVertex(traverser.get()).equals(vertex))) {
-                            traverser.detach();
                             if (returnHaltedTraversers)
-                                
memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new 
TraverserSet<>(traverser));
+                                
memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new 
TraverserSet<>(MasterExecutor.detach(traverser, haltedTraverserFactory)));
                             else
-                                haltedTraversers.add(traverser);
-                        } else {
-                            traverser.detach();
-                            traverserSet.add(traverser);
-                        }
+                                haltedTraversers.add(traverser.detach());
+                        } else
+                            traverserSet.add(traverser.detach());
                     });
                 }
                 memory.add(TraversalVertexProgram.MUTATED_MEMORY_KEYS, new 
HashSet<>(Collections.singleton(step.getId())));
@@ -189,14 +197,14 @@ final class WorkerExecutor {
         } else { // LOCAL PROCESSING
             step.forEachRemaining(traverser -> {
                 if (traverser.isHalted() &&
-                        (returnHaltedTraversers ||
+                        // if its a ReferenceFactory (one less iteration)
+                        ((returnHaltedTraversers || haltedTraverserFactory == 
ReferenceFactory.class) &&
                                 (!(traverser.get() instanceof Element) && 
!(traverser.get() instanceof Property)) ||
                                 
getHostingVertex(traverser.get()).equals(vertex))) {
-                    traverser.detach();
                     if (returnHaltedTraversers)
-                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, 
new TraverserSet<>(traverser));
+                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, 
new TraverserSet<>(MasterExecutor.detach(traverser, haltedTraverserFactory)));
                     else
-                        haltedTraversers.add(traverser);
+                        haltedTraversers.add(traverser.detach());
                 } else {
                     activeTraversers.add(traverser);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b72ea666/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
index 58e44a2..2b2498b 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
@@ -30,6 +30,7 @@ import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequire
 import org.apache.tinkerpop.gremlin.process.traversal.util.PureTraversal;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory;
 
 import java.util.Collections;
 import java.util.List;
@@ -41,6 +42,7 @@ import java.util.Set;
 public final class TraversalVertexProgramStep extends VertexProgramStep 
implements TraversalParent {
 
     public PureTraversal<?, ?> computerTraversal;
+    private Class haltedTraverserFactory = ReferenceFactory.class;
 
     public TraversalVertexProgramStep(final Traversal.Admin traversal, final 
Traversal.Admin<?, ?> computerTraversal) {
         super(traversal);
@@ -69,7 +71,9 @@ public final class TraversalVertexProgramStep extends 
VertexProgramStep implemen
         
this.getTraversal().getStrategies().toList().forEach(computerSpecificTraversal.getStrategies()::addStrategies);
         computerSpecificTraversal.setSideEffects(new 
MemoryTraversalSideEffects(this.getTraversal().getSideEffects()));
         computerSpecificTraversal.setParent(this);
-        final TraversalVertexProgram.Builder builder = 
TraversalVertexProgram.build().traversal(computerSpecificTraversal);
+        final TraversalVertexProgram.Builder builder = 
TraversalVertexProgram.build()
+                .traversal(computerSpecificTraversal)
+                .haltedTraverserFactory(this.haltedTraverserFactory);
         if (memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS))
             
builder.haltedTraversers(memory.get(TraversalVertexProgram.HALTED_TRAVERSERS));
         return builder.create(graph);
@@ -96,6 +100,10 @@ public final class TraversalVertexProgramStep extends 
VertexProgramStep implemen
         this.integrateChild(this.computerTraversal.get());
     }
 
+    public void setHaltedTraverserFactory(final Class 
haltedTraverserDetachFactory) {
+        this.haltedTraverserFactory = haltedTraverserDetachFactory;
+    }
+
     /*@Override
     public int hashCode() {
         return super.hashCode() ^ this.computerTraversal.hashCode();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b72ea666/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategy.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategy.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategy.java
new file mode 100644
index 0000000..44542c7
--- /dev/null
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategy.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration;
+
+import 
org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
+import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class HaltedTraverserFactoryStrategy extends 
AbstractTraversalStrategy<TraversalStrategy.DecorationStrategy> implements 
TraversalStrategy.DecorationStrategy {
+
+    private final Class haltedTraverserFactory;
+
+    private HaltedTraverserFactoryStrategy(final Class haltedTraverserFactory) 
{
+        this.haltedTraverserFactory = haltedTraverserFactory;
+    }
+
+    public void apply(final Traversal.Admin<?, ?> traversal) {
+        
TraversalHelper.getStepsOfAssignableClass(TraversalVertexProgramStep.class, 
traversal)
+                .forEach(step -> 
step.setHaltedTraverserFactory(this.haltedTraverserFactory));
+    }
+
+    public static HaltedTraverserFactoryStrategy detach() {
+        return new HaltedTraverserFactoryStrategy(DetachedFactory.class);
+    }
+
+    public static HaltedTraverserFactoryStrategy reference() {
+        return new HaltedTraverserFactoryStrategy(ReferenceFactory.class);
+    }
+}

Reply via email to