Removed a PathProcessor.ID constraint from ComputerVerificationStrategy. Moreover, sampling and ordering is more efficient as the projected data is co-located with the traverser in the new ProjectedTraverser wrapper. Going to leave it at this for tp32/... Moving forward, we can make it so we don't need to DetachFactory.detach(true) for CollectingBarrierStep by maintaining 'future data.' Its complicated and I don't want to introduce potential bugs.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/b2f0c57d Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/b2f0c57d Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/b2f0c57d Branch: refs/heads/TINKERPOP-1565 Commit: b2f0c57df6fd9191904213622ae718a0790d7a03 Parents: 5045f67 Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Wed Jan 18 11:07:32 2017 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Wed Jan 18 11:07:32 2017 -0700 ---------------------------------------------------------------------- .../traversal/step/filter/SampleGlobalStep.java | 19 ++++++++++++-- .../traversal/step/map/OrderGlobalStep.java | 27 +++++--------------- .../step/util/CollectingBarrierStep.java | 24 ++++++++++------- .../ComputerVerificationStrategy.java | 8 ------ .../traversal/traverser/ProjectedTraverser.java | 16 +++++++----- .../gremlin/util/function/MultiComparator.java | 14 +++++++--- 6 files changed, 60 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2f0c57d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/SampleGlobalStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/SampleGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/SampleGlobalStep.java index 0a4da58..2b2cf20 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/SampleGlobalStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/SampleGlobalStep.java @@ -24,6 +24,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.lambda.ConstantTraversal; import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating; import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent; import org.apache.tinkerpop.gremlin.process.traversal.step.util.CollectingBarrierStep; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.ProjectedTraverser; import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil; @@ -64,6 +65,15 @@ public final class SampleGlobalStep<S> extends CollectingBarrierStep<S> implemen } @Override + public void processAllStarts() { + if (this.starts.hasNext()) { + while (this.starts.hasNext()) { + this.traverserSet.add(this.createProjectedTraverser(this.starts.next())); + } + } + } + + @Override public void barrierConsumer(final TraverserSet<S> traverserSet) { // return the entire traverser set if the set is smaller than the amount to sample if (traverserSet.bulkSize() <= this.amountToSample) @@ -71,7 +81,7 @@ public final class SampleGlobalStep<S> extends CollectingBarrierStep<S> implemen //////////////// else sample the set double totalWeight = 0.0d; for (final Traverser.Admin<S> s : traverserSet) { - totalWeight = totalWeight + TraversalUtil.apply(s, this.probabilityTraversal).doubleValue() * s.bulk(); + totalWeight = totalWeight + (((ProjectedTraverser<S, Number>) s).getProjections().get(0).doubleValue() * s.bulk()); } /////// final TraverserSet<S> sampledSet = new TraverserSet<>(); @@ -82,7 +92,7 @@ public final class SampleGlobalStep<S> extends CollectingBarrierStep<S> implemen for (final Traverser.Admin<S> s : traverserSet) { long sampleBulk = sampledSet.contains(s) ? sampledSet.get(s).bulk() : 0; if (sampleBulk < s.bulk()) { - final double currentWeight = TraversalUtil.apply(s, this.probabilityTraversal).doubleValue(); + final double currentWeight = ((ProjectedTraverser<S, Number>) s).getProjections().get(0).doubleValue(); for (int i = 0; i < (s.bulk() - sampleBulk); i++) { runningWeight = runningWeight + currentWeight; if (RANDOM.nextDouble() <= ((runningWeight / totalWeight))) { @@ -104,6 +114,11 @@ public final class SampleGlobalStep<S> extends CollectingBarrierStep<S> implemen traverserSet.addAll(sampledSet); } + + private final ProjectedTraverser<S, Number> createProjectedTraverser(final Traverser.Admin<S> traverser) { + return new ProjectedTraverser<>(traverser, Collections.singletonList(TraversalUtil.apply(traverser, this.probabilityTraversal))); + } + @Override public Set<TraverserRequirement> getRequirements() { return this.getSelfAndChildRequirements(TraverserRequirement.BULK); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2f0c57d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java index 9c071f1..55d8650 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java @@ -73,22 +73,11 @@ public final class OrderGlobalStep<S, C extends Comparable> extends CollectingBa this.multiComparator = this.createMultiComparator(); if (this.starts.hasNext()) { while (this.starts.hasNext()) { - this.traverserSet.add(this.createOrderedTraverser(this.starts.next())); + this.traverserSet.add(this.createProjectedTraverser(this.starts.next())); } - this.barrierConsumer(this.traverserSet); } } - @Override - public Traverser.Admin<S> processNextStart() { - if (!this.traverserSet.isEmpty()) { - return this.traverserSet.remove(); - } else if (this.starts.hasNext()) { - this.processAllStarts(); - } - return ((ProjectedTraverser) this.traverserSet.remove()).getInternal(); - } - public void setLimit(final long limit) { this.limit = limit; } @@ -162,18 +151,18 @@ public final class OrderGlobalStep<S, C extends Comparable> extends CollectingBa public MemoryComputeKey<TraverserSet<S>> getMemoryComputeKey() { if (null == this.multiComparator) this.multiComparator = this.createMultiComparator(); - return MemoryComputeKey.of(this.getId(), new OrderBiOperator<>(this.limit, this.isShuffle, this.multiComparator), false, true); + return MemoryComputeKey.of(this.getId(), new OrderBiOperator<>(this.limit, this.multiComparator), false, true); } - private ProjectedTraverser<S> createOrderedTraverser(final Traverser.Admin<S> traverser) { + private final ProjectedTraverser<S,Object> createProjectedTraverser(final Traverser.Admin<S> traverser) { final List<Object> projections = new ArrayList<>(this.comparators.size()); for (final Pair<Traversal.Admin<S, C>, Comparator<C>> pair : this.comparators) { projections.add(TraversalUtil.apply(traverser, pair.getValue0())); } - return new ProjectedTraverser<S>(traverser, projections); + return new ProjectedTraverser<>(traverser, projections); } - private MultiComparator<C> createMultiComparator() { + private final MultiComparator<C> createMultiComparator() { final List<Comparator<C>> list = new ArrayList<>(this.comparators.size()); for (final Pair<Traversal.Admin<S, C>, Comparator<C>> pair : this.comparators) { list.add(pair.getValue1()); @@ -186,16 +175,14 @@ public final class OrderGlobalStep<S, C extends Comparable> extends CollectingBa public static final class OrderBiOperator<S> implements BinaryOperator<TraverserSet<S>>, Serializable { private long limit; - private boolean isShuffle; private MultiComparator comparator; private OrderBiOperator() { // for serializers that need a no-arg constructor } - public OrderBiOperator(final long limit, final boolean isShuffle, final MultiComparator multiComparator) { + public OrderBiOperator(final long limit, final MultiComparator multiComparator) { this.limit = limit; - this.isShuffle = isShuffle; this.comparator = multiComparator; } @@ -203,7 +190,7 @@ public final class OrderGlobalStep<S, C extends Comparable> extends CollectingBa public TraverserSet<S> apply(final TraverserSet<S> setA, final TraverserSet<S> setB) { setA.addAll(setB); if (Long.MAX_VALUE != this.limit && setA.bulkSize() > this.limit) { - if (this.isShuffle) + if (this.comparator.isShuffle()) setA.shuffle(); else setA.sort(this.comparator); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2f0c57d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java index b0cce80..f99201d 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java @@ -23,11 +23,13 @@ import org.apache.tinkerpop.gremlin.process.traversal.Operator; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.ProjectedTraverser; import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import java.util.Collections; import java.util.NoSuchElementException; @@ -40,7 +42,8 @@ import java.util.function.BinaryOperator; public abstract class CollectingBarrierStep<S> extends AbstractStep<S, S> implements Barrier<TraverserSet<S>> { protected TraverserSet<S> traverserSet = new TraverserSet<>(); - protected int maxBarrierSize; + private int maxBarrierSize; + private boolean barrierConsumed = false; public CollectingBarrierStep(final Traversal.Admin traversal) { this(traversal, Integer.MAX_VALUE); @@ -68,7 +71,6 @@ public abstract class CollectingBarrierStep<S> extends AbstractStep<S, S> implem this.traverserSet.add(this.starts.next()); } } - this.barrierConsumer(this.traverserSet); } } @@ -85,11 +87,10 @@ public abstract class CollectingBarrierStep<S> extends AbstractStep<S, S> implem throw FastNoSuchElementException.instance(); else { final TraverserSet<S> temp = new TraverserSet<>(); - this.traverserSet.iterator().forEachRemaining(t -> { + IteratorUtils.removeOnNext(this.traverserSet.iterator()).forEachRemaining(t -> { DetachedFactory.detach(t, true); // this should be dynamic temp.add(t); }); - this.traverserSet.clear(); return temp; } } @@ -98,23 +99,28 @@ public abstract class CollectingBarrierStep<S> extends AbstractStep<S, S> implem public void addBarrier(final TraverserSet<S> barrier) { this.traverserSet = barrier; this.traverserSet.forEach(traverser -> traverser.setSideEffects(this.getTraversal().getSideEffects())); - this.barrierConsumer(this.traverserSet); + this.barrierConsumed = false; } @Override public Traverser.Admin<S> processNextStart() { - if (!this.traverserSet.isEmpty()) { - return this.traverserSet.remove(); - } else if (this.starts.hasNext()) { + if (this.traverserSet.isEmpty() && this.starts.hasNext()) { this.processAllStarts(); + this.barrierConsumed = false; + } + // + if (!this.barrierConsumed) { + this.barrierConsumer(this.traverserSet); + this.barrierConsumed = true; } - return this.traverserSet.remove(); + return ProjectedTraverser.tryUnwrap(this.traverserSet.remove()); } @Override public CollectingBarrierStep<S> clone() { final CollectingBarrierStep<S> clone = (CollectingBarrierStep<S>) super.clone(); clone.traverserSet = new TraverserSet<>(); + clone.barrierConsumed = false; return clone; } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2f0c57d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java index 5777adb..ef9b95c 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java @@ -28,11 +28,9 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing; import org.apache.tinkerpop.gremlin.process.traversal.step.Mutating; import org.apache.tinkerpop.gremlin.process.traversal.step.PathProcessor; import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent; -import org.apache.tinkerpop.gremlin.process.traversal.step.filter.SampleGlobalStep; import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep; import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectStep; import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.util.CollectingBarrierStep; import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; import org.apache.tinkerpop.gremlin.process.traversal.step.util.ProfileStep; import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy; @@ -87,12 +85,6 @@ public final class ComputerVerificationStrategy extends AbstractTraversalStrateg throw new VerificationException("Local traversals may not traverse past the local star-graph on GraphComputer: " + traversalOptional.get(), traversal); } - // sample step use can only operate on the element and its properties (no incidences) - if (step instanceof SampleGlobalStep) { - if (((TraversalParent) step).getLocalChildren().stream().filter(t -> !TraversalHelper.isLocalProperties(t)).findAny().isPresent()) - throw new VerificationException("The following barrier step can not process the incident edges of a vertex on GraphComputer: " + step, traversal); - } - // this is a problem because sideEffect.merge() is transient on the OLAP reduction if (TraversalHelper.getRootTraversal(traversal).getTraverserRequirements().contains(TraverserRequirement.ONE_BULK)) throw new VerificationException("One bulk is currently not supported on GraphComputer: " + step, traversal); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2f0c57d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/ProjectedTraverser.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/ProjectedTraverser.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/ProjectedTraverser.java index 67e723a..5cecdc4 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/ProjectedTraverser.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/ProjectedTraverser.java @@ -32,16 +32,16 @@ import java.util.function.Function; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class ProjectedTraverser<T> implements Traverser.Admin<T> { +public final class ProjectedTraverser<T, P> implements Traverser.Admin<T> { private Traverser.Admin<T> internal; - private List<Object> projections; + private List<P> projections; private ProjectedTraverser() { // for serialization } - public ProjectedTraverser(final Traverser.Admin<T> internal, final List<Object> projections) { + public ProjectedTraverser(final Traverser.Admin<T> internal, final List<P> projections) { this.internal = internal; this.projections = projections; } @@ -51,7 +51,7 @@ public final class ProjectedTraverser<T> implements Traverser.Admin<T> { return this.internal; } - public List<Object> getProjections() { + public List<P> getProjections() { return this.projections; } @@ -187,13 +187,17 @@ public final class ProjectedTraverser<T> implements Traverser.Admin<T> { } @Override - public ProjectedTraverser<T> clone() { + public ProjectedTraverser<T, P> clone() { try { - final ProjectedTraverser<T> clone = (ProjectedTraverser<T>) super.clone(); + final ProjectedTraverser<T, P> clone = (ProjectedTraverser<T, P>) super.clone(); clone.internal = (Traverser.Admin<T>) this.internal.clone(); return clone; } catch (final CloneNotSupportedException e) { throw new IllegalStateException(e.getMessage(), e); } } + + public static <T> Traverser.Admin<T> tryUnwrap(final Traverser.Admin<T> traverser) { + return traverser instanceof ProjectedTraverser ? ((ProjectedTraverser) traverser).getInternal() : traverser; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2f0c57d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/MultiComparator.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/MultiComparator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/MultiComparator.java index 427aa3d..b7176ab 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/MultiComparator.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/MultiComparator.java @@ -32,9 +32,11 @@ import java.util.List; public final class MultiComparator<C> implements Comparator<C>, Serializable { private final List<Comparator> comparators; + private final boolean isShuffle; public MultiComparator(final List<Comparator<C>> comparators) { this.comparators = (List) comparators; + this.isShuffle = !this.comparators.isEmpty() && Order.shuffle == this.comparators.get(this.comparators.size() - 1); } @Override @@ -43,14 +45,20 @@ public final class MultiComparator<C> implements Comparator<C>, Serializable { return Order.incr.compare(objectA, objectB); } else { for (int i = 0; i < this.comparators.size(); i++) { - final int comparison = this.comparators.get(i).compare(this.getObject(objectA, i), this.getObject(objectB, i)); - if (comparison != 0) - return comparison; + if (Order.shuffle != this.comparators.get(i)) { + final int comparison = this.comparators.get(i).compare(this.getObject(objectA, i), this.getObject(objectB, i)); + if (comparison != 0) + return comparison; + } } return 0; } } + public boolean isShuffle() { + return this.isShuffle; + } + private final Object getObject(final C object, final int index) { if (object instanceof ProjectedTraverser) return ((ProjectedTraverser) object).getProjections().get(index);