Added SingleIterationStrategy which is able to rewrite a set of traversals to not use message passing in OLAP. This is signficant for all GraphComputers as message passing is expense and furthermore, for SparkGraphComputer as without message-passing, there is no need to cache or partition the graph once loaded.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/f6b66977 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/f6b66977 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/f6b66977 Branch: refs/heads/TINKERPOP-1602 Commit: f6b669778ec058a555623c6119e0feaaac59c8be Parents: f0875d7 Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Thu Jan 26 12:57:58 2017 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Fri Jan 27 14:24:18 2017 -0700 ---------------------------------------------------------------------- CHANGELOG.asciidoc | 3 +- .../step/map/TraversalVertexProgramStep.java | 5 + .../optimization/SingleIterationStrategy.java | 132 +++++++++++++++++++ .../process/traversal/TraversalStrategies.java | 2 + .../SingleIterationStrategyTest.java | 100 ++++++++++++++ .../SparkSingleIterationStrategy.java | 48 ++++++- .../SparkSingleIterationStrategyTest.java | 89 +++++++++++-- 7 files changed, 362 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/CHANGELOG.asciidoc ---------------------------------------------------------------------- diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 95cfb71..860d401 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -26,7 +26,8 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima TinkerPop 3.2.4 (Release Date: NOT OFFICIALLY RELEASED YET) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -* Fixed a bug where `keepLabels` were not being pushed down into child traversals by `PathRetractionStrategy`. +* Fixed a bug where `PathProcessor.keepLabels` were not being pushed down into child traversals by `PathRetractionStrategy`. +* Added `SingleIterationStrategy` as a default `GraphComputer` strategy that can re-write some traversals to not require message passing. * Fixed a bug associated with user-provided maps and `GroupSideEffectStep`. * `GroupBiOperator` no longer maintains a detached traversal and thus, no more side-effect related OLAP inconsistencies. * Added `ProjectedTraverser` which wraps a traverser with a `List<Object>` of projected data. http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java index cb7db29..e866ce2 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java @@ -54,6 +54,11 @@ public final class TraversalVertexProgramStep extends VertexProgramStep implemen return Collections.singletonList(this.computerTraversal.get()); } + public void setComputerTraversal(final Traversal.Admin<?,?> computerTraversal) { + this.computerTraversal = new PureTraversal<>(computerTraversal); + this.integrateChild(this.computerTraversal.get()); + } + @Override public String toString() { return StringFactory.stepString(this, this.computerTraversal.get(), new GraphFilter(this.computer)); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java new file mode 100644 index 0000000..efcbe9a --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization; + +import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep; +import org.apache.tinkerpop.gremlin.process.computer.util.EmptyMemory; +import org.apache.tinkerpop.gremlin.process.traversal.Scope; +import org.apache.tinkerpop.gremlin.process.traversal.Step; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.DefaultGraphTraversal; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; +import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; +import org.apache.tinkerpop.gremlin.process.traversal.step.LambdaHolder; +import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent; +import org.apache.tinkerpop.gremlin.process.traversal.step.branch.LocalStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaFlatMapStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaMapStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.FilterRankingStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.IncidentToAdjacentStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.InlineFilterStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; +import org.apache.tinkerpop.gremlin.structure.Direction; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class SingleIterationStrategy extends AbstractTraversalStrategy<TraversalStrategy.OptimizationStrategy> implements TraversalStrategy.OptimizationStrategy { + + private static final SingleIterationStrategy INSTANCE = new SingleIterationStrategy(); + + private static final Set<Class<? extends OptimizationStrategy>> PRIORS = new HashSet<>(Arrays.asList( + IncidentToAdjacentStrategy.class, + AdjacentToIncidentStrategy.class, + FilterRankingStrategy.class, + InlineFilterStrategy.class)); + + private static final Set<Class> MULTI_ITERATION_CLASSES = new HashSet<>(Arrays.asList( + EdgeVertexStep.class, + LambdaFlatMapStep.class, + LambdaMapStep.class + )); + + + private SingleIterationStrategy() { + } + + @Override + public void apply(final Traversal.Admin<?, ?> traversal) { + // only process the first traversal step in an OLAP chain + TraversalHelper.getFirstStepOfAssignableClass(TraversalVertexProgramStep.class, traversal).ifPresent(step -> { + final Graph graph = traversal.getGraph().orElse(EmptyGraph.instance()); // best guess at what the graph will be as its dynamically determined + final Traversal.Admin<?, ?> computerTraversal = step.generateProgram(graph, EmptyMemory.instance()).getTraversal().get().clone(); + // does the traversal as it is message pass? + boolean doesMessagePass = TraversalHelper.hasStepOfAssignableClassRecursively(Scope.global, MULTI_ITERATION_CLASSES, computerTraversal); + if (!doesMessagePass) { + for (final VertexStep vertexStep : TraversalHelper.getStepsOfAssignableClassRecursively(Scope.global, VertexStep.class, computerTraversal)) { + if (vertexStep.returnsVertex() || !vertexStep.getDirection().equals(Direction.OUT)) { // in-edges require message pass in OLAP + doesMessagePass = true; + break; + } + } + } // if the traversal doesn't message pass, then don't try and localize it as its just wasted computation + if (doesMessagePass) { + final boolean beyondStarGraph = + TraversalHelper.hasStepOfAssignableClassRecursively(Scope.global, LambdaHolder.class, computerTraversal) || + !TraversalHelper.isLocalStarGraph(computerTraversal); + if (!beyondStarGraph && // if we move beyond the star graph, then localization is not possible. + !(computerTraversal.getStartStep().getNextStep() instanceof EmptyStep) && // if its just a g.V()/E(), then don't localize + !(computerTraversal.getStartStep().getNextStep() instanceof LocalStep) && // removes the potential for the infinite recursive application of the traversal + !(computerTraversal.getStartStep().getNextStep() instanceof Barrier) && // if the second step is a barrier, no point in trying to localize anything + !(TraversalHelper.getStepsOfAssignableClass(TraversalParent.class, computerTraversal). // this is a strict precaution that could be loosed with deeper logic on barriers in global children + stream(). + filter(parent -> !parent.getGlobalChildren().isEmpty()).findAny().isPresent())) { + + final Traversal.Admin<?, ?> newComputerTraversal = step.computerTraversal.getPure(); + final Traversal.Admin localTraversal = new DefaultGraphTraversal<>(); + final Step barrier = (Step) TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, newComputerTraversal).orElse(null); + final Step endStep = null == barrier ? newComputerTraversal.getEndStep() : barrier.getPreviousStep(); + if (!(endStep instanceof VertexStep || endStep instanceof EdgeVertexStep)) { + TraversalHelper.removeToTraversal(newComputerTraversal.getStartStep().getNextStep(), null == barrier ? EmptyStep.instance() : barrier, localTraversal); + assert !localTraversal.getSteps().isEmpty(); // given the if() constraints, this is impossible + if (localTraversal.getSteps().size() > 1) { // if its just a single step, a local wrap will not alter its locus of computation + if (null == barrier) + TraversalHelper.insertTraversal(0, (Traversal.Admin) __.local(localTraversal), newComputerTraversal); + else + TraversalHelper.insertTraversal(barrier.getPreviousStep(), (Traversal.Admin) __.local(localTraversal), newComputerTraversal); + step.setComputerTraversal(newComputerTraversal); + } + } + } + } + }); + } + + @Override + public Set<Class<? extends OptimizationStrategy>> applyPrior() { + return PRIORS; + } + + public static SingleIterationStrategy instance() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java index 015df70..a1cb99c 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java @@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.process.traversal; import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.GraphFilterStrategy; +import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.SingleIterationStrategy; import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ConnectiveStrategy; import org.apache.tinkerpop.gremlin.process.traversal.strategy.finalization.ProfileStrategy; import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy; @@ -222,6 +223,7 @@ public interface TraversalStrategies extends Serializable, Cloneable { final TraversalStrategies graphComputerStrategies = new DefaultTraversalStrategies(); graphComputerStrategies.addStrategies( GraphFilterStrategy.instance(), + SingleIterationStrategy.instance(), OrderLimitStrategy.instance(), PathProcessorStrategy.instance(), ComputerVerificationStrategy.instance()); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java new file mode 100644 index 0000000..612fb9d --- /dev/null +++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization; + +import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.DefaultGraphTraversal; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversalStrategies; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +@RunWith(Parameterized.class) +public class SingleIterationStrategyTest { + + @Parameterized.Parameter(value = 0) + public Traversal original; + + @Parameterized.Parameter(value = 1) + public Traversal optimized; + + @Parameterized.Parameter(value = 2) + public Collection<TraversalStrategy> otherStrategies; + + @Test + public void doTest() { + final Traversal.Admin<?, ?> rootTraversal = new DefaultGraphTraversal<>(); + final TraversalVertexProgramStep parent = new TraversalVertexProgramStep(rootTraversal, this.original.asAdmin()); + rootTraversal.addStep(parent.asStep()); + parent.setComputerTraversal(this.original.asAdmin()); + final TraversalStrategies strategies = new DefaultTraversalStrategies(); + strategies.addStrategies(SingleIterationStrategy.instance()); + for (final TraversalStrategy strategy : this.otherStrategies) { + strategies.addStrategies(strategy); + } + rootTraversal.setStrategies(strategies); + rootTraversal.asAdmin().applyStrategies(); + assertEquals(this.optimized, parent.computerTraversal.get()); + } + + + @Parameterized.Parameters(name = "{0}") + public static Iterable<Object[]> generateTestParameters() { + + return Arrays.asList(new Object[][]{ + {__.V().out().count(), __.V().outE().count(), Collections.singletonList(AdjacentToIncidentStrategy.instance())}, + {__.V().id(), __.V().id(), Collections.emptyList()}, + {__.V().out().count(), __.V().out().count(), Collections.emptyList()}, + {__.V().out().label().count(), __.V().out().label().count(), Collections.emptyList()}, + {__.V().in().id(), __.V().local(__.in().id()), Collections.emptyList()}, + {__.V().out().id(), __.V().local(__.out().id()), Collections.emptyList()}, + {__.V().both().id(), __.V().local(__.both().id()), Collections.emptyList()}, + {__.V().outE().inV().id().count(), __.V().local(__.outE().inV().id()).count(), Collections.emptyList()}, + {__.V().map(__.outE().inV()).count(), __.V().map(__.outE().inV()).count(), Collections.emptyList()}, + {__.V().out().map(__.outE().inV()).count(), __.V().out().map(__.outE().inV()).count(), Collections.emptyList()}, + {__.V().outE().map(__.inV()).id().count(), __.V().outE().map(__.inV()).id().count(), Collections.emptyList()}, + {__.V().outE().map(__.inV()).count(), __.V().outE().map(__.inV()).count(), Collections.emptyList()}, + {__.V().outE().map(__.inV()).values("name").count(), __.V().outE().map(__.inV()).values("name").count(), Collections.emptyList()}, + {__.V().outE().inV().count(), __.V().outE().inV().count(), Collections.emptyList()}, + {__.V().out().id().count(), __.V().local(__.out().id()).count(), Collections.emptyList()}, + {__.V().in().id().count(), __.V().local(__.in().id()).count(), Collections.emptyList()}, + {__.V().in().id().select("id-map").dedup().count(), __.V().local(__.in().id().select("id-map")).dedup().count(), Collections.emptyList()}, + {__.V().outE().values("weight"), __.V().outE().values("weight"), Collections.emptyList()}, + {__.V().outE().values("weight").sum(), __.V().outE().values("weight").sum(), Collections.emptyList()}, + {__.V().inE().values("weight"), __.V().local(__.inE().values("weight")), Collections.emptyList()}, + {__.V().inE().values("weight").sum(), __.V().local(__.inE().values("weight")).sum(), Collections.emptyList()}, + {__.V().inE().values("weight").sum().dedup().count(), __.V().local(__.inE().values("weight")).sum().dedup().count(), Collections.emptyList()}, + }); + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java index a4acf4c..2abb9b8 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java @@ -23,12 +23,23 @@ import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep; import org.apache.tinkerpop.gremlin.process.computer.util.EmptyMemory; import org.apache.tinkerpop.gremlin.process.traversal.Scope; +import org.apache.tinkerpop.gremlin.process.traversal.Step; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; +import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent; +import org.apache.tinkerpop.gremlin.process.traversal.step.branch.LocalStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.filter.FilterStep; import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep; import org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaFlatMapStep; import org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaMapStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectOneStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalFlatMapStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalMapStep; import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy; import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; import org.apache.tinkerpop.gremlin.structure.Direction; @@ -48,9 +59,8 @@ public final class SparkSingleIterationStrategy extends AbstractTraversalStrateg private static final Set<Class> MULTI_ITERATION_CLASSES = new HashSet<>(Arrays.asList( EdgeVertexStep.class, - LambdaMapStep.class, // maybe? - LambdaFlatMapStep.class // maybe? - // VertexStep is special as you need to see if the return class is Edge or Vertex (logic below) + LambdaMapStep.class, + LambdaFlatMapStep.class )); private SparkSingleIterationStrategy() { @@ -63,6 +73,7 @@ public final class SparkSingleIterationStrategy extends AbstractTraversalStrateg final Traversal.Admin<?, ?> computerTraversal = step.generateProgram(graph, EmptyMemory.instance()).getTraversal().get().clone(); if (!computerTraversal.isLocked()) computerTraversal.applyStrategies(); + /// boolean doesMessagePass = TraversalHelper.hasStepOfAssignableClassRecursively(Scope.global, MULTI_ITERATION_CLASSES, computerTraversal); if (!doesMessagePass) { for (final VertexStep vertexStep : TraversalHelper.getStepsOfAssignableClassRecursively(Scope.global, VertexStep.class, computerTraversal)) { @@ -72,18 +83,45 @@ public final class SparkSingleIterationStrategy extends AbstractTraversalStrateg } } } - if (!doesMessagePass) { + if (!doesMessagePass && !SparkSingleIterationStrategy.endsWithInElement(computerTraversal)) { step.setComputer(step.getComputer() // if no message passing, don't partition the loaded graph .configure(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, true) - // if no message passing, don't cache the loaded graph + // if no message passing, don't cache the loaded graph .configure(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE, true)); } } } + private static final boolean endsWithInElement(final Traversal.Admin<?, ?> traversal) { + Step<?, ?> current = traversal.getEndStep(); + while (!(current instanceof EmptyStep)) { + if ((current instanceof VertexStep && (((VertexStep) current).returnsVertex() || + !((VertexStep) current).getDirection().equals(Direction.OUT))) || + current instanceof EdgeVertexStep) { + return true; + } else if (current instanceof TraversalMapStep || current instanceof TraversalFlatMapStep || current instanceof LocalStep) { + if (endsWithInElement(((TraversalParent) current).getLocalChildren().get(0))) + return true; + } else if (current instanceof TraversalParent) { + if (((TraversalParent) current).getGlobalChildren().stream().filter(SparkSingleIterationStrategy::endsWithInElement).findAny().isPresent()) + return true; + } + if (!(current instanceof FilterStep || + current instanceof SideEffectStep || + current instanceof SelectStep || + current instanceof SelectOneStep || + current instanceof Barrier)) { + return false; + } + current = current.getPreviousStep(); + } + return false; + } + public static SparkSingleIterationStrategy instance() { return INSTANCE; } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java index 4e43438..20596d7 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java @@ -24,6 +24,7 @@ import org.apache.tinkerpop.gremlin.TestHelper; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat; import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep; +import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.SingleIterationStrategy; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; @@ -32,7 +33,9 @@ import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest; import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer; import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider; import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD; +import org.apache.tinkerpop.gremlin.structure.Column; import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.T; import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; import org.junit.Test; @@ -60,29 +63,93 @@ public class SparkSingleIterationStrategyTest extends AbstractSparkTest { configuration.setProperty(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER, SparkGraphComputer.class.getCanonicalName()); configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); + /////////// WITHOUT SINGLE-ITERATION STRATEGY LESS SINGLE-PASS OPTIONS ARE AVAILABLE + Graph graph = GraphFactory.open(configuration); - GraphTraversalSource g = graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class); + GraphTraversalSource g = graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class, SingleIterationStrategy.class); assertFalse(g.getStrategies().toList().contains(SparkInterceptorStrategy.instance())); - assertFalse(g.V().count().explain().toString().contains(SparkInterceptorStrategy.class.getSimpleName())); + assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkInterceptorStrategy).findAny().isPresent()); + assertFalse(g.getStrategies().toList().contains(SingleIterationStrategy.instance())); + assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SingleIterationStrategy).findAny().isPresent()); assertTrue(g.getStrategies().toList().contains(SparkSingleIterationStrategy.instance())); - assertTrue(g.V().count().explain().toString().contains(SparkSingleIterationStrategy.class.getSimpleName())); + assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkSingleIterationStrategy).findAny().isPresent()); test(true, g.V().limit(10)); test(true, g.V().values("age").groupCount()); test(true, g.V().groupCount().by(__.out().count())); test(true, g.V().outE()); - test(true, 6l, g.V().count()); - test(true, 6l, g.V().out().count()); - test(true, 6l, g.V().local(__.inE()).count()); - test(true, 6l, g.V().outE().inV().count()); + test(true, 6L, g.V().count()); + test(true, 6L, g.V().out().count()); + test(true, 6L, g.V().outE().inV().count()); //// + test(false, 6L, g.V().local(__.inE()).count()); test(false, g.V().outE().inV()); test(false, g.V().both()); - test(false, 12l, g.V().both().count()); + test(false, 12L, g.V().both().count()); test(false, g.V().out().id()); - test(false, 2l, g.V().out().out().count()); - test(false, 6l, g.V().in().count()); - test(false, 6l, g.V().inE().count()); + test(false, 2L, g.V().out().out().count()); + test(false, 6L, g.V().in().count()); + test(false, 6L, g.V().inE().count()); + + /////////// WITH SINGLE-ITERATION STRATEGY MORE SINGLE-PASS OPTIONS ARE AVAILABLE + + graph = GraphFactory.open(configuration); + g = graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class).withStrategies(SingleIterationStrategy.instance()); + assertFalse(g.getStrategies().toList().contains(SparkInterceptorStrategy.instance())); + assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkInterceptorStrategy).findAny().isPresent()); + assertTrue(g.getStrategies().toList().contains(SingleIterationStrategy.instance())); + assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SingleIterationStrategy).findAny().isPresent()); + assertTrue(g.getStrategies().toList().contains(SparkSingleIterationStrategy.instance())); + assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkSingleIterationStrategy).findAny().isPresent()); + + test(true, g.V().limit(10)); + test(true, g.V().values("age").groupCount()); + test(true, g.V().groupCount().by(__.out().count())); + test(true, g.V().outE()); + test(true, 6L, g.V().outE().values("weight").count()); + test(true, 6L, g.V().inE().values("weight").count()); + test(true, 12L, g.V().bothE().values("weight").count()); + test(true, g.V().bothE().values("weight")); + test(true, g.V().bothE().values("weight").limit(2)); + test(true, 6L, g.V().count()); + test(true, 6L, g.V().id().count()); + test(true, 6L, g.V().out().count()); + test(true, 6L, g.V().outE().inV().count()); + test(true, 6L, g.V().outE().inV().id().count()); + test(true, 2L, g.V().outE().inV().id().groupCount().select(Column.values).unfold().dedup().count()); + test(true, g.V().out().id()); + test(true, 6L, g.V().outE().valueMap().count()); + test(true, g.V().outE().valueMap()); + test(true, 6L, g.V().inE().valueMap().count()); + test(true, g.V().inE().valueMap()); + test(true, 12L, g.V().bothE().valueMap().count()); + test(true, g.V().bothE().valueMap()); + test(true, 6L, g.V().inE().id().count()); + test(true, 6L, g.V().outE().count()); + test(true, 4L, g.V().outE().inV().id().dedup().count()); + test(true, 6L, g.V().as("a").outE().inV().as("b").id().dedup("a", "b").by(T.id).count()); + test(true, 4L, g.V().filter(__.in()).count()); + test(true, 6L, g.V().sideEffect(__.in()).count()); + ///// + test(false, 6L, g.V().local(__.inE()).count()); + test(false, 6L, g.V().outE().outV().count()); // todo: low probability traversal, but none the less could be optimized for + test(false, 6L, g.V().in().count()); + test(false, 6L, g.V().flatMap(__.in()).count()); + test(false, 4L, g.V().map(__.in()).count()); + test(false, 6L, g.V().local(__.in()).count()); + test(false, 6L, g.V().inE().count()); + test(false, g.V().outE().inV()); + test(false, g.V().both()); + test(false, 12L, g.V().both().count()); + test(false, g.V().outE().inV().dedup()); + test(false, 4L, g.V().outE().inV().dedup().count()); + test(false, 2L, g.V().out().out().count()); + test(false, 6L, g.V().as("a").map(__.both()).select("a").count()); + test(false, g.V().out().values("name")); + test(false, g.V().out().properties("name")); + test(false, g.V().out().valueMap()); + test(false, 6L, g.V().as("a").outE().inV().values("name").as("b").dedup("a", "b").count()); + test(false, 2L, g.V().outE().inV().groupCount().select(Column.values).unfold().dedup().count()); } private static <R> void test(boolean singleIteration, final Traversal<?, R> traversal) {