Added SingleIterationStrategy which is able to rewrite a set of traversals to 
not use message passing in OLAP. This is signficant for all GraphComputers as 
message passing is expense and furthermore, for SparkGraphComputer as without 
message-passing, there is no need to cache or partition the graph once loaded.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/f6b66977
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/f6b66977
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/f6b66977

Branch: refs/heads/TINKERPOP-1602
Commit: f6b669778ec058a555623c6119e0feaaac59c8be
Parents: f0875d7
Author: Marko A. Rodriguez <okramma...@gmail.com>
Authored: Thu Jan 26 12:57:58 2017 -0700
Committer: Marko A. Rodriguez <okramma...@gmail.com>
Committed: Fri Jan 27 14:24:18 2017 -0700

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |   3 +-
 .../step/map/TraversalVertexProgramStep.java    |   5 +
 .../optimization/SingleIterationStrategy.java   | 132 +++++++++++++++++++
 .../process/traversal/TraversalStrategies.java  |   2 +
 .../SingleIterationStrategyTest.java            | 100 ++++++++++++++
 .../SparkSingleIterationStrategy.java           |  48 ++++++-
 .../SparkSingleIterationStrategyTest.java       |  89 +++++++++++--
 7 files changed, 362 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 95cfb71..860d401 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,7 +26,8 @@ 
image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 TinkerPop 3.2.4 (Release Date: NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
-* Fixed a bug where `keepLabels` were not being pushed down into child 
traversals by `PathRetractionStrategy`.
+* Fixed a bug where `PathProcessor.keepLabels` were not being pushed down into 
child traversals by `PathRetractionStrategy`.
+* Added `SingleIterationStrategy` as a default `GraphComputer` strategy that 
can re-write some traversals to not require message passing.
 * Fixed a bug associated with user-provided maps and `GroupSideEffectStep`.
 * `GroupBiOperator` no longer maintains a detached traversal and thus, no more 
side-effect related OLAP inconsistencies.
 * Added `ProjectedTraverser` which wraps a traverser with a `List<Object>` of 
projected data.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
index cb7db29..e866ce2 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
@@ -54,6 +54,11 @@ public final class TraversalVertexProgramStep extends 
VertexProgramStep implemen
         return Collections.singletonList(this.computerTraversal.get());
     }
 
+    public void setComputerTraversal(final Traversal.Admin<?,?> 
computerTraversal) {
+        this.computerTraversal = new PureTraversal<>(computerTraversal);
+        this.integrateChild(this.computerTraversal.get());
+    }
+
     @Override
     public String toString() {
         return StringFactory.stepString(this, this.computerTraversal.get(), 
new GraphFilter(this.computer));

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java
new file mode 100644
index 0000000..efcbe9a
--- /dev/null
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java
@@ -0,0 +1,132 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package 
org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization;
+
+import 
org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.computer.util.EmptyMemory;
+import org.apache.tinkerpop.gremlin.process.traversal.Scope;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.DefaultGraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.LambdaHolder;
+import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import org.apache.tinkerpop.gremlin.process.traversal.step.branch.LocalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaFlatMapStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaMapStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.FilterRankingStrategy;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.IncidentToAdjacentStrategy;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.InlineFilterStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SingleIterationStrategy extends 
AbstractTraversalStrategy<TraversalStrategy.OptimizationStrategy> implements 
TraversalStrategy.OptimizationStrategy {
+
+    private static final SingleIterationStrategy INSTANCE = new 
SingleIterationStrategy();
+
+    private static final Set<Class<? extends OptimizationStrategy>> PRIORS = 
new HashSet<>(Arrays.asList(
+            IncidentToAdjacentStrategy.class,
+            AdjacentToIncidentStrategy.class,
+            FilterRankingStrategy.class,
+            InlineFilterStrategy.class));
+
+    private static final Set<Class> MULTI_ITERATION_CLASSES = new 
HashSet<>(Arrays.asList(
+            EdgeVertexStep.class,
+            LambdaFlatMapStep.class,
+            LambdaMapStep.class
+    ));
+
+
+    private SingleIterationStrategy() {
+    }
+
+    @Override
+    public void apply(final Traversal.Admin<?, ?> traversal) {
+        // only process the first traversal step in an OLAP chain
+        
TraversalHelper.getFirstStepOfAssignableClass(TraversalVertexProgramStep.class, 
traversal).ifPresent(step -> {
+            final Graph graph = 
traversal.getGraph().orElse(EmptyGraph.instance()); // best guess at what the 
graph will be as its dynamically determined
+            final Traversal.Admin<?, ?> computerTraversal = 
step.generateProgram(graph, 
EmptyMemory.instance()).getTraversal().get().clone();
+            // does the traversal as it is message pass?
+            boolean doesMessagePass = 
TraversalHelper.hasStepOfAssignableClassRecursively(Scope.global, 
MULTI_ITERATION_CLASSES, computerTraversal);
+            if (!doesMessagePass) {
+                for (final VertexStep vertexStep : 
TraversalHelper.getStepsOfAssignableClassRecursively(Scope.global, 
VertexStep.class, computerTraversal)) {
+                    if (vertexStep.returnsVertex() || 
!vertexStep.getDirection().equals(Direction.OUT)) { // in-edges require message 
pass in OLAP
+                        doesMessagePass = true;
+                        break;
+                    }
+                }
+            } // if the traversal doesn't message pass, then don't try and 
localize it as its just wasted computation
+            if (doesMessagePass) {
+                final boolean beyondStarGraph =
+                        
TraversalHelper.hasStepOfAssignableClassRecursively(Scope.global, 
LambdaHolder.class, computerTraversal) ||
+                                
!TraversalHelper.isLocalStarGraph(computerTraversal);
+                if (!beyondStarGraph &&                                        
                                // if we move beyond the star graph, then 
localization is not possible.
+                        !(computerTraversal.getStartStep().getNextStep() 
instanceof EmptyStep) &&              // if its just a g.V()/E(), then don't 
localize
+                        !(computerTraversal.getStartStep().getNextStep() 
instanceof LocalStep) &&              // removes the potential for the infinite 
recursive application of the traversal
+                        !(computerTraversal.getStartStep().getNextStep() 
instanceof Barrier) &&                // if the second step is a barrier, no 
point in trying to localize anything
+                        
!(TraversalHelper.getStepsOfAssignableClass(TraversalParent.class, 
computerTraversal). // this is a strict precaution that could be loosed with 
deeper logic on barriers in global children
+                                stream().
+                                filter(parent -> 
!parent.getGlobalChildren().isEmpty()).findAny().isPresent())) {
+
+                    final Traversal.Admin<?, ?> newComputerTraversal = 
step.computerTraversal.getPure();
+                    final Traversal.Admin localTraversal = new 
DefaultGraphTraversal<>();
+                    final Step barrier = (Step) 
TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, 
newComputerTraversal).orElse(null);
+                    final Step endStep = null == barrier ? 
newComputerTraversal.getEndStep() : barrier.getPreviousStep();
+                    if (!(endStep instanceof VertexStep || endStep instanceof 
EdgeVertexStep)) {
+                        
TraversalHelper.removeToTraversal(newComputerTraversal.getStartStep().getNextStep(),
 null == barrier ? EmptyStep.instance() : barrier, localTraversal);
+                        assert !localTraversal.getSteps().isEmpty(); // given 
the if() constraints, this is impossible
+                        if (localTraversal.getSteps().size() > 1) { // if its 
just a single step, a local wrap will not alter its locus of computation
+                            if (null == barrier)
+                                TraversalHelper.insertTraversal(0, 
(Traversal.Admin) __.local(localTraversal), newComputerTraversal);
+                            else
+                                
TraversalHelper.insertTraversal(barrier.getPreviousStep(), (Traversal.Admin) 
__.local(localTraversal), newComputerTraversal);
+                            step.setComputerTraversal(newComputerTraversal);
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    @Override
+    public Set<Class<? extends OptimizationStrategy>> applyPrior() {
+        return PRIORS;
+    }
+
+    public static SingleIterationStrategy instance() {
+        return INSTANCE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
index 015df70..a1cb99c 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.process.traversal;
 
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import 
org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.GraphFilterStrategy;
+import 
org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.SingleIterationStrategy;
 import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ConnectiveStrategy;
 import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.finalization.ProfileStrategy;
 import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy;
@@ -222,6 +223,7 @@ public interface TraversalStrategies extends Serializable, 
Cloneable {
             final TraversalStrategies graphComputerStrategies = new 
DefaultTraversalStrategies();
             graphComputerStrategies.addStrategies(
                     GraphFilterStrategy.instance(),
+                    SingleIterationStrategy.instance(),
                     OrderLimitStrategy.instance(),
                     PathProcessorStrategy.instance(),
                     ComputerVerificationStrategy.instance());

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java
 
b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java
new file mode 100644
index 0000000..612fb9d
--- /dev/null
+++ 
b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java
@@ -0,0 +1,100 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package 
org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization;
+
+import 
org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.DefaultGraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy;
+import 
org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversalStrategies;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@RunWith(Parameterized.class)
+public class SingleIterationStrategyTest {
+
+    @Parameterized.Parameter(value = 0)
+    public Traversal original;
+
+    @Parameterized.Parameter(value = 1)
+    public Traversal optimized;
+
+    @Parameterized.Parameter(value = 2)
+    public Collection<TraversalStrategy> otherStrategies;
+
+    @Test
+    public void doTest() {
+        final Traversal.Admin<?, ?> rootTraversal = new 
DefaultGraphTraversal<>();
+        final TraversalVertexProgramStep parent = new 
TraversalVertexProgramStep(rootTraversal, this.original.asAdmin());
+        rootTraversal.addStep(parent.asStep());
+        parent.setComputerTraversal(this.original.asAdmin());
+        final TraversalStrategies strategies = new 
DefaultTraversalStrategies();
+        strategies.addStrategies(SingleIterationStrategy.instance());
+        for (final TraversalStrategy strategy : this.otherStrategies) {
+            strategies.addStrategies(strategy);
+        }
+        rootTraversal.setStrategies(strategies);
+        rootTraversal.asAdmin().applyStrategies();
+        assertEquals(this.optimized, parent.computerTraversal.get());
+    }
+
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Iterable<Object[]> generateTestParameters() {
+
+        return Arrays.asList(new Object[][]{
+                {__.V().out().count(), __.V().outE().count(), 
Collections.singletonList(AdjacentToIncidentStrategy.instance())},
+                {__.V().id(), __.V().id(), Collections.emptyList()},
+                {__.V().out().count(), __.V().out().count(), 
Collections.emptyList()},
+                {__.V().out().label().count(), __.V().out().label().count(), 
Collections.emptyList()},
+                {__.V().in().id(), __.V().local(__.in().id()), 
Collections.emptyList()},
+                {__.V().out().id(), __.V().local(__.out().id()), 
Collections.emptyList()},
+                {__.V().both().id(), __.V().local(__.both().id()), 
Collections.emptyList()},
+                {__.V().outE().inV().id().count(), 
__.V().local(__.outE().inV().id()).count(), Collections.emptyList()},
+                {__.V().map(__.outE().inV()).count(), 
__.V().map(__.outE().inV()).count(), Collections.emptyList()},
+                {__.V().out().map(__.outE().inV()).count(), 
__.V().out().map(__.outE().inV()).count(), Collections.emptyList()},
+                {__.V().outE().map(__.inV()).id().count(), 
__.V().outE().map(__.inV()).id().count(), Collections.emptyList()},
+                {__.V().outE().map(__.inV()).count(), 
__.V().outE().map(__.inV()).count(), Collections.emptyList()},
+                {__.V().outE().map(__.inV()).values("name").count(), 
__.V().outE().map(__.inV()).values("name").count(), Collections.emptyList()},
+                {__.V().outE().inV().count(), __.V().outE().inV().count(), 
Collections.emptyList()},
+                {__.V().out().id().count(), 
__.V().local(__.out().id()).count(), Collections.emptyList()},
+                {__.V().in().id().count(), __.V().local(__.in().id()).count(), 
Collections.emptyList()},
+                {__.V().in().id().select("id-map").dedup().count(), 
__.V().local(__.in().id().select("id-map")).dedup().count(), 
Collections.emptyList()},
+                {__.V().outE().values("weight"), 
__.V().outE().values("weight"), Collections.emptyList()},
+                {__.V().outE().values("weight").sum(), 
__.V().outE().values("weight").sum(), Collections.emptyList()},
+                {__.V().inE().values("weight"), 
__.V().local(__.inE().values("weight")), Collections.emptyList()},
+                {__.V().inE().values("weight").sum(), 
__.V().local(__.inE().values("weight")).sum(), Collections.emptyList()},
+                {__.V().inE().values("weight").sum().dedup().count(), 
__.V().local(__.inE().values("weight")).sum().dedup().count(), 
Collections.emptyList()},
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
index a4acf4c..2abb9b8 100644
--- 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
@@ -23,12 +23,23 @@ import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import 
org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
 import org.apache.tinkerpop.gremlin.process.computer.util.EmptyMemory;
 import org.apache.tinkerpop.gremlin.process.traversal.Scope;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import org.apache.tinkerpop.gremlin.process.traversal.step.branch.LocalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.filter.FilterStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep;
 import 
org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaFlatMapStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaMapStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectOneStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectStep;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalFlatMapStep;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalMapStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
 import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
 import org.apache.tinkerpop.gremlin.structure.Direction;
@@ -48,9 +59,8 @@ public final class SparkSingleIterationStrategy extends 
AbstractTraversalStrateg
 
     private static final Set<Class> MULTI_ITERATION_CLASSES = new 
HashSet<>(Arrays.asList(
             EdgeVertexStep.class,
-            LambdaMapStep.class, // maybe?
-            LambdaFlatMapStep.class // maybe?
-            // VertexStep is special as you need to see if the return class is 
Edge or Vertex (logic below)
+            LambdaMapStep.class,
+            LambdaFlatMapStep.class
     ));
 
     private SparkSingleIterationStrategy() {
@@ -63,6 +73,7 @@ public final class SparkSingleIterationStrategy extends 
AbstractTraversalStrateg
             final Traversal.Admin<?, ?> computerTraversal = 
step.generateProgram(graph, 
EmptyMemory.instance()).getTraversal().get().clone();
             if (!computerTraversal.isLocked())
                 computerTraversal.applyStrategies();
+            ///
             boolean doesMessagePass = 
TraversalHelper.hasStepOfAssignableClassRecursively(Scope.global, 
MULTI_ITERATION_CLASSES, computerTraversal);
             if (!doesMessagePass) {
                 for (final VertexStep vertexStep : 
TraversalHelper.getStepsOfAssignableClassRecursively(Scope.global, 
VertexStep.class, computerTraversal)) {
@@ -72,18 +83,45 @@ public final class SparkSingleIterationStrategy extends 
AbstractTraversalStrateg
                     }
                 }
             }
-            if (!doesMessagePass) {
+            if (!doesMessagePass && 
!SparkSingleIterationStrategy.endsWithInElement(computerTraversal)) {
                 step.setComputer(step.getComputer()
                         // if no message passing, don't partition the loaded 
graph
                         .configure(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, 
true)
-                                // if no message passing, don't cache the 
loaded graph
+                        // if no message passing, don't cache the loaded graph
                         .configure(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE, 
true));
             }
         }
     }
 
+    private static final boolean endsWithInElement(final Traversal.Admin<?, ?> 
traversal) {
+        Step<?, ?> current = traversal.getEndStep();
+        while (!(current instanceof EmptyStep)) {
+            if ((current instanceof VertexStep && (((VertexStep) 
current).returnsVertex() ||
+                    !((VertexStep) 
current).getDirection().equals(Direction.OUT))) ||
+                    current instanceof EdgeVertexStep) {
+                return true;
+            } else if (current instanceof TraversalMapStep || current 
instanceof TraversalFlatMapStep || current instanceof LocalStep) {
+                if (endsWithInElement(((TraversalParent) 
current).getLocalChildren().get(0)))
+                    return true;
+            } else if (current instanceof TraversalParent) {
+                if (((TraversalParent) 
current).getGlobalChildren().stream().filter(SparkSingleIterationStrategy::endsWithInElement).findAny().isPresent())
+                    return true;
+            }
+            if (!(current instanceof FilterStep ||
+                    current instanceof SideEffectStep ||
+                    current instanceof SelectStep ||
+                    current instanceof SelectOneStep ||
+                    current instanceof Barrier)) {
+                return false;
+            }
+            current = current.getPreviousStep();
+        }
+        return false;
+    }
+
     public static SparkSingleIterationStrategy instance() {
         return INSTANCE;
     }
 
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java
 
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java
index 4e43438..20596d7 100644
--- 
a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java
+++ 
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java
@@ -24,6 +24,7 @@ import org.apache.tinkerpop.gremlin.TestHelper;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
 import 
org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import 
org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.SingleIterationStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
@@ -32,7 +33,9 @@ import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
 import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
 import 
org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
 import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
+import org.apache.tinkerpop.gremlin.structure.Column;
 import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.T;
 import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
 import org.junit.Test;
 
@@ -60,29 +63,93 @@ public class SparkSingleIterationStrategyTest extends 
AbstractSparkTest {
         
configuration.setProperty(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER, 
SparkGraphComputer.class.getCanonicalName());
         configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, 
true);
 
+        /////////// WITHOUT SINGLE-ITERATION STRATEGY LESS SINGLE-PASS OPTIONS 
ARE AVAILABLE
+
         Graph graph = GraphFactory.open(configuration);
-        GraphTraversalSource g = 
graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class);
+        GraphTraversalSource g = 
graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class,
 SingleIterationStrategy.class);
         
assertFalse(g.getStrategies().toList().contains(SparkInterceptorStrategy.instance()));
-        
assertFalse(g.V().count().explain().toString().contains(SparkInterceptorStrategy.class.getSimpleName()));
+        
assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair
 -> pair.getValue0() instanceof 
SparkInterceptorStrategy).findAny().isPresent());
+        
assertFalse(g.getStrategies().toList().contains(SingleIterationStrategy.instance()));
+        
assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair
 -> pair.getValue0() instanceof SingleIterationStrategy).findAny().isPresent());
         
assertTrue(g.getStrategies().toList().contains(SparkSingleIterationStrategy.instance()));
-        
assertTrue(g.V().count().explain().toString().contains(SparkSingleIterationStrategy.class.getSimpleName()));
+        
assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair 
-> pair.getValue0() instanceof 
SparkSingleIterationStrategy).findAny().isPresent());
 
         test(true, g.V().limit(10));
         test(true, g.V().values("age").groupCount());
         test(true, g.V().groupCount().by(__.out().count()));
         test(true, g.V().outE());
-        test(true, 6l, g.V().count());
-        test(true, 6l, g.V().out().count());
-        test(true, 6l, g.V().local(__.inE()).count());
-        test(true, 6l, g.V().outE().inV().count());
+        test(true, 6L, g.V().count());
+        test(true, 6L, g.V().out().count());
+        test(true, 6L, g.V().outE().inV().count());
         ////
+        test(false, 6L, g.V().local(__.inE()).count());
         test(false, g.V().outE().inV());
         test(false, g.V().both());
-        test(false, 12l, g.V().both().count());
+        test(false, 12L, g.V().both().count());
         test(false, g.V().out().id());
-        test(false, 2l, g.V().out().out().count());
-        test(false, 6l, g.V().in().count());
-        test(false, 6l, g.V().inE().count());
+        test(false, 2L, g.V().out().out().count());
+        test(false, 6L, g.V().in().count());
+        test(false, 6L, g.V().inE().count());
+
+        /////////// WITH SINGLE-ITERATION STRATEGY MORE SINGLE-PASS OPTIONS 
ARE AVAILABLE
+
+        graph = GraphFactory.open(configuration);
+        g = 
graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class).withStrategies(SingleIterationStrategy.instance());
+        
assertFalse(g.getStrategies().toList().contains(SparkInterceptorStrategy.instance()));
+        
assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair
 -> pair.getValue0() instanceof 
SparkInterceptorStrategy).findAny().isPresent());
+        
assertTrue(g.getStrategies().toList().contains(SingleIterationStrategy.instance()));
+        
assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair 
-> pair.getValue0() instanceof SingleIterationStrategy).findAny().isPresent());
+        
assertTrue(g.getStrategies().toList().contains(SparkSingleIterationStrategy.instance()));
+        
assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair 
-> pair.getValue0() instanceof 
SparkSingleIterationStrategy).findAny().isPresent());
+
+        test(true, g.V().limit(10));
+        test(true, g.V().values("age").groupCount());
+        test(true, g.V().groupCount().by(__.out().count()));
+        test(true, g.V().outE());
+        test(true, 6L, g.V().outE().values("weight").count());
+        test(true, 6L, g.V().inE().values("weight").count());
+        test(true, 12L, g.V().bothE().values("weight").count());
+        test(true, g.V().bothE().values("weight"));
+        test(true, g.V().bothE().values("weight").limit(2));
+        test(true, 6L, g.V().count());
+        test(true, 6L, g.V().id().count());
+        test(true, 6L, g.V().out().count());
+        test(true, 6L, g.V().outE().inV().count());
+        test(true, 6L, g.V().outE().inV().id().count());
+        test(true, 2L, 
g.V().outE().inV().id().groupCount().select(Column.values).unfold().dedup().count());
+        test(true, g.V().out().id());
+        test(true, 6L, g.V().outE().valueMap().count());
+        test(true, g.V().outE().valueMap());
+        test(true, 6L, g.V().inE().valueMap().count());
+        test(true, g.V().inE().valueMap());
+        test(true, 12L, g.V().bothE().valueMap().count());
+        test(true, g.V().bothE().valueMap());
+        test(true, 6L, g.V().inE().id().count());
+        test(true, 6L, g.V().outE().count());
+        test(true, 4L, g.V().outE().inV().id().dedup().count());
+        test(true, 6L, g.V().as("a").outE().inV().as("b").id().dedup("a", 
"b").by(T.id).count());
+        test(true, 4L, g.V().filter(__.in()).count());
+        test(true, 6L, g.V().sideEffect(__.in()).count());
+        /////
+        test(false, 6L, g.V().local(__.inE()).count());
+        test(false, 6L, g.V().outE().outV().count()); // todo: low probability 
traversal, but none the less could be optimized for
+        test(false, 6L, g.V().in().count());
+        test(false, 6L, g.V().flatMap(__.in()).count());
+        test(false, 4L, g.V().map(__.in()).count());
+        test(false, 6L, g.V().local(__.in()).count());
+        test(false, 6L, g.V().inE().count());
+        test(false, g.V().outE().inV());
+        test(false, g.V().both());
+        test(false, 12L, g.V().both().count());
+        test(false, g.V().outE().inV().dedup());
+        test(false, 4L, g.V().outE().inV().dedup().count());
+        test(false, 2L, g.V().out().out().count());
+        test(false, 6L, g.V().as("a").map(__.both()).select("a").count());
+        test(false, g.V().out().values("name"));
+        test(false, g.V().out().properties("name"));
+        test(false, g.V().out().valueMap());
+        test(false, 6L, 
g.V().as("a").outE().inV().values("name").as("b").dedup("a", "b").count());
+        test(false, 2L, 
g.V().outE().inV().groupCount().select(Column.values).unfold().dedup().count());
     }
 
     private static <R> void test(boolean singleIteration, final Traversal<?, 
R> traversal) {

Reply via email to