Repository: incubator-tinkerpop Updated Branches: refs/heads/master afd40488f -> 6113c9283
fixed the NPE that occurs in OLAP when you have a local/OLTP group() nested within an OLAP group(). The solution is elegant, but the problem, its not as efficient as the code when we had the NPE.... dar. Going to fiddle some more tomorrow to see if I can get it faster --- 600ms vs 400ms differences. Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/c890ceba Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/c890ceba Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/c890ceba Branch: refs/heads/master Commit: c890cebad5231cdab8744878bebb4ea363689a3a Parents: 44d40f6 Author: Marko A. Rodriguez <[email protected]> Authored: Mon May 23 16:24:58 2016 -0600 Committer: Marko A. Rodriguez <[email protected]> Committed: Mon May 23 16:24:58 2016 -0600 ---------------------------------------------------------------------- .../process/traversal/step/map/GroupStep.java | 210 ++++++++++--------- .../step/sideEffect/GroupSideEffectStep.java | 59 ++---- .../step/sideEffect/GroovyGroupTest.groovy | 5 + .../traversal/step/sideEffect/GroupTest.java | 42 ++++ .../SparkStarBarrierInterceptor.java | 1 - .../structure/TinkerGraphPlayTest.java | 4 +- 6 files changed, 176 insertions(+), 145 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c890ceba/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java index 4f5df35..b430d8f 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java @@ -29,7 +29,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.lambda.IdentityTraversal; import org.apache.tinkerpop.gremlin.process.traversal.lambda.TokenTraversal; import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating; -import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing; import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent; import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep; import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement; @@ -39,6 +38,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; +import org.javatuples.Pair; import java.io.Serializable; import java.util.ArrayList; @@ -46,25 +46,24 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.function.BinaryOperator; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> implements ByModulating, TraversalParent, GraphComputing { +public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> implements ByModulating, TraversalParent { private char state = 'k'; private Traversal.Admin<S, K> keyTraversal = null; - private Traversal.Admin<S, V> valueTraversal = this.integrateChild(__.fold().asAdmin()); - private Traversal.Admin<S, ?> preTraversal = null; // used in OLAP - private ReducingBarrierStep reducingBarrierStep = null; // used in OLAP - private boolean onGraphComputer = false; + private Traversal.Admin<S, ?> preTraversal; + private Traversal.Admin<S, V> valueTraversal; public GroupStep(final Traversal.Admin traversal) { super(traversal); - this.setReducingBiOperator(new GroupBiOperator<>(this.valueTraversal, this.onGraphComputer)); + this.valueTraversal = this.integrateChild(__.fold().asAdmin()); + this.preTraversal = this.integrateChild(splitOnBarrierStep(this.valueTraversal).get(0)); + this.setReducingBiOperator(new GroupBiOperator<>(this.valueTraversal)); this.setSeedSupplier(HashMapSupplier.instance()); } @@ -75,7 +74,8 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> this.state = 'v'; } else if ('v' == this.state) { this.valueTraversal = this.integrateChild(convertValueTraversal(kvTraversal)); - this.setReducingBiOperator(new GroupBiOperator<>(this.valueTraversal, this.onGraphComputer)); + this.preTraversal = this.integrateChild(splitOnBarrierStep(this.valueTraversal).get(0)); + this.setReducingBiOperator(new GroupBiOperator<>(this.valueTraversal)); this.state = 'x'; } else { throw new IllegalStateException("The key and value traversals for group()-step have already been set: " + this); @@ -85,21 +85,11 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> @Override public Map<K, V> projectTraverser(final Traverser.Admin<S> traverser) { final Map<K, V> map = new HashMap<>(1); - final K key = TraversalUtil.applyNullable(traverser, this.keyTraversal); - if (this.onGraphComputer) { - if (null == this.reducingBarrierStep) { - final TraverserSet traverserSet = new TraverserSet(); - this.preTraversal.reset(); - this.preTraversal.addStart(traverser.split()); - this.preTraversal.getEndStep().forEachRemaining(traverserSet::add); - map.put(key, (V) traverserSet); - } else { - this.valueTraversal.reset(); - this.valueTraversal.addStart(traverser.split()); - map.put(key, (V) this.reducingBarrierStep.nextBarrier()); - } - } else - map.put(key, (V) traverser); + final TraverserSet traverserSet = new TraverserSet<>(); + this.preTraversal.reset(); + this.preTraversal.addStart(traverser.split()); + this.preTraversal.getEndStep().forEachRemaining(traverserSet::add); + map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverserSet); return map; } @@ -128,11 +118,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> if (null != this.keyTraversal) clone.keyTraversal = this.keyTraversal.clone(); clone.valueTraversal = this.valueTraversal.clone(); - if (null != this.preTraversal) - clone.preTraversal = this.preTraversal.clone(); - final Optional<Barrier> optional = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, clone.valueTraversal); - if (optional.isPresent() && optional.get() instanceof ReducingBarrierStep) - clone.reducingBarrierStep = (ReducingBarrierStep) optional.get(); + clone.preTraversal = this.integrateChild(GroupStep.splitOnBarrierStep(clone.valueTraversal).get(0)); return clone; } @@ -154,37 +140,19 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> @Override public Map<K, V> generateFinalResult(final Map<K, V> object) { - return GroupStep.doFinalReduction((Map<K, Object>) object, this.valueTraversal, this.onGraphComputer); - } - - @Override - public void onGraphComputer() { - this.preTraversal = this.integrateChild(splitOnBarrierStep(this.valueTraversal).get(0)); - final Optional<Barrier> optional = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, this.valueTraversal); - if (optional.isPresent() && optional.get() instanceof ReducingBarrierStep) - this.reducingBarrierStep = (ReducingBarrierStep) optional.get(); - this.setReducingBiOperator(new GroupBiOperator<>(this.valueTraversal, this.onGraphComputer = true)); + return GroupStep.doFinalReduction((Map<K, Object>) object, this.valueTraversal); } /////////////////////// public static final class GroupBiOperator<K, V> implements BinaryOperator<Map<K, V>>, Serializable { - private boolean onGraphComputer; - private BinaryOperator reducingBinaryOperator; // OLAP (w/ reducer) - private transient Traversal.Admin<?, V> valueTraversal; // OLTP - private transient Map<K, Integer> counters; // OLTP + private Traversal.Admin<?, V> valueTraversal; + private ReducingBarrierStep reducingBarrierStep = null; - public GroupBiOperator(final Traversal.Admin<?, V> valueTraversal, final boolean onGraphComputer) { - this.onGraphComputer = onGraphComputer; - if (this.onGraphComputer) { - final Optional<Barrier> optional = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, valueTraversal); - if (optional.isPresent() && optional.get() instanceof ReducingBarrierStep) - this.reducingBinaryOperator = ((ReducingBarrierStep) optional.get()).getBiOperator(); - } else { - this.valueTraversal = valueTraversal; - this.counters = new HashMap<>(); - } + public GroupBiOperator(final Traversal.Admin<?, V> valueTraversal) { + this.valueTraversal = valueTraversal.clone(); + this.reducingBarrierStep = TraversalHelper.getFirstStepOfAssignableClass(ReducingBarrierStep.class, this.valueTraversal).orElse(null); } public GroupBiOperator() { @@ -194,43 +162,84 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> @Override public Map<K, V> apply(final Map<K, V> mapA, final Map<K, V> mapB) { for (final K key : mapB.keySet()) { - if (this.onGraphComputer) { - final Object objectB = mapB.get(key); - if (null != this.reducingBinaryOperator) { - // OLAP -- if there is a mid-traversal, apply the binary reducer and propagate the mutating barrier - final Object objectA = mapA.get(key); - mapA.put(key, (V) (null == objectA ? objectB : this.reducingBinaryOperator.apply(objectA, objectB))); - } else { - // OLAP -- if there is no mid-traversal reducer, aggregate pre-barrier traversers into a traverser set (expensive, but that's that) - final Object objectA = mapA.get(key); - final TraverserSet traverserSet; - if (null == objectA) { - traverserSet = new TraverserSet(); - mapA.put(key, (V) traverserSet); - } else - traverserSet = (TraverserSet) objectA; - traverserSet.addAll((TraverserSet) objectB); - } + Object objectA = mapA.get(key); + final Object objectB = mapB.get(key); + assert null != objectB; + if (null == objectA) { + objectA = objectB; } else { - // OLTP -- do mid-barrier reductions if they exist, else don't. Bulking is also available here because of addStart() prior to barrier. - final Traverser.Admin traverser = (Traverser.Admin) mapB.get(key); - Traversal.Admin valueTraversalClone = (Traversal.Admin) mapA.get(key); - if (null == valueTraversalClone) { - this.counters.put(key, 0); - valueTraversalClone = this.valueTraversal.clone(); - mapA.put(key, (V) valueTraversalClone); - } - valueTraversalClone.addStart(traverser); - if (this.counters.compute(key, (k, i) -> ++i) > 1000) { - this.counters.put(key, 0); - TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, valueTraversalClone).ifPresent(Barrier::processAllStarts); + if (objectA instanceof TraverserSet) { + if (objectB instanceof TraverserSet) { + final TraverserSet set = (TraverserSet) objectA; + set.addAll((TraverserSet) objectB); + if (null != this.reducingBarrierStep && set.size() > 1000) { + this.valueTraversal.reset(); + this.reducingBarrierStep.addStarts(set.iterator()); + objectA = this.reducingBarrierStep.nextBarrier(); + } + } else if (objectB instanceof Pair) { + final TraverserSet set = (TraverserSet) objectA; + set.addAll((TraverserSet) ((Pair) objectB).getValue0()); + if (set.size() > 1000) { + this.valueTraversal.reset(); + this.reducingBarrierStep.addStarts(set.iterator()); + this.reducingBarrierStep.addBarrier(((Pair) objectB).getValue1()); + objectA = this.reducingBarrierStep.nextBarrier(); + } else { + objectA = Pair.with(set, ((Pair) objectB).getValue1()); + } + } else { + objectA = Pair.with(objectA, objectB); + } + } else if (objectA instanceof Pair) { + if (objectB instanceof TraverserSet) { + final TraverserSet set = (TraverserSet) ((Pair) objectA).getValue0(); + set.addAll((TraverserSet) objectB); + if (null != this.reducingBarrierStep &&set.size() > 1000) { + this.valueTraversal.reset(); + this.reducingBarrierStep.addStarts(set.iterator()); + this.reducingBarrierStep.addBarrier(((Pair) objectA).getValue1()); + objectA = this.reducingBarrierStep.nextBarrier(); + } + } else if (objectB instanceof Pair) { + this.valueTraversal.reset(); + this.reducingBarrierStep.addBarrier(((Pair) objectA).getValue1()); + this.reducingBarrierStep.addBarrier(((Pair) objectB).getValue1()); + this.reducingBarrierStep.addStarts(((TraverserSet) ((Pair) objectA).getValue0()).iterator()); + this.reducingBarrierStep.addStarts(((TraverserSet) ((Pair) objectB).getValue0()).iterator()); + objectA = this.reducingBarrierStep.nextBarrier(); + } else { + this.valueTraversal.reset(); + this.reducingBarrierStep.addBarrier(((Pair) objectA).getValue1()); + this.reducingBarrierStep.addBarrier(objectB); + this.reducingBarrierStep.addStarts(((TraverserSet) ((Pair) objectA).getValue0()).iterator()); + objectA = this.reducingBarrierStep.nextBarrier(); + } + } else { + if (objectB instanceof TraverserSet) { + objectA = Pair.with(objectB, objectA); + } else if (objectB instanceof Pair) { + this.valueTraversal.reset(); + this.reducingBarrierStep.addBarrier(objectA); + this.reducingBarrierStep.addBarrier(((Pair) objectB).getValue1()); + this.reducingBarrierStep.addStarts(((TraverserSet) ((Pair) objectB).getValue0()).iterator()); + objectA = this.reducingBarrierStep.nextBarrier(); + } else { + this.valueTraversal.reset(); + this.reducingBarrierStep.addBarrier(objectA); + this.reducingBarrierStep.addBarrier(objectB); + objectA = this.reducingBarrierStep.nextBarrier(); + } } } + mapA.put(key, (V) objectA); } return mapA; + } } + /////////////////////// public static <S, E> Traversal.Admin<S, E> convertValueTraversal(final Traversal.Admin<S, E> valueTraversal) { @@ -263,26 +272,25 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> } } - public static <K, V> Map<K, V> doFinalReduction(final Map<K, Object> map, final Traversal.Admin<?, V> valueTraversal, final boolean onGraphComputer) { + public static <K, V> Map<K, V> doFinalReduction(final Map<K, Object> map, final Traversal.Admin<?, V> valueTraversal) { final Map<K, V> reducedMap = new HashMap<>(map.size()); - // if not on OLAP, who cares --- don't waste time computing barriers - final boolean hasReducingBarrier = onGraphComputer && - TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, valueTraversal).isPresent() && - TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, valueTraversal).get() instanceof ReducingBarrierStep; - final Traversal.Admin<?, ?> postTraversal = (onGraphComputer & !hasReducingBarrier) ? splitOnBarrierStep(valueTraversal.clone()).get(1) : null; + final ReducingBarrierStep reducingBarrierStep = TraversalHelper.getFirstStepOfAssignableClass(ReducingBarrierStep.class, valueTraversal).orElse(null); IteratorUtils.removeOnNext(map.entrySet().iterator()).forEachRemaining(entry -> { - if (onGraphComputer) { - if (hasReducingBarrier) { // OLAP with reduction (barrier) - valueTraversal.reset(); - TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, valueTraversal).get().addBarrier(entry.getValue()); - reducedMap.put(entry.getKey(), valueTraversal.next()); - } else { // OLAP without reduction (traverser set) - postTraversal.reset(); - postTraversal.addStarts(((TraverserSet) entry.getValue()).iterator()); - reducedMap.put(entry.getKey(), (V) postTraversal.next()); - } - } else // OLTP is just a traversal - reducedMap.put(entry.getKey(), ((Traversal.Admin<?, V>) entry.getValue()).next()); + valueTraversal.reset(); + if (null == reducingBarrierStep) { + reducedMap.put(entry.getKey(), entry.getValue() instanceof TraverserSet ? + ((TraverserSet<V>) entry.getValue()).iterator().next().get() : + (V) entry.getValue()); + } else { + if (entry.getValue() instanceof TraverserSet) + reducingBarrierStep.addStarts(((TraverserSet) entry.getValue()).iterator()); + else if (entry.getValue() instanceof Pair) { + reducingBarrierStep.addStarts(((TraverserSet) (((Pair) entry.getValue()).getValue0())).iterator()); + reducingBarrierStep.addBarrier((((Pair) entry.getValue()).getValue1())); + } else + reducingBarrierStep.addBarrier(entry.getValue()); + reducedMap.put(entry.getKey(), valueTraversal.next()); + } }); assert map.isEmpty(); map.clear(); @@ -290,3 +298,5 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> return (Map<K, V>) map; } } + + http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c890ceba/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java index a906312..4fc4ffa 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java @@ -21,16 +21,12 @@ package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; -import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating; -import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing; import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable; import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent; import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep; import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; -import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier; @@ -39,27 +35,26 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class GroupSideEffectStep<S, K, V> extends SideEffectStep<S> implements SideEffectCapable<Map<K, ?>, Map<K, V>>, TraversalParent, ByModulating, GraphComputing { +public final class GroupSideEffectStep<S, K, V> extends SideEffectStep<S> implements SideEffectCapable<Map<K, ?>, Map<K, V>>, TraversalParent, ByModulating { private char state = 'k'; private Traversal.Admin<S, K> keyTraversal = null; + private Traversal.Admin<S, ?> preTraversal = null; private Traversal.Admin<S, V> valueTraversal = this.integrateChild(__.fold().asAdmin()); - private Traversal.Admin<S, ?> preTraversal = null; // used in OLAP - private ReducingBarrierStep reducingBarrierStep = null; // used in OLAP - private boolean onGraphComputer = false; /// private String sideEffectKey; public GroupSideEffectStep(final Traversal.Admin traversal, final String sideEffectKey) { super(traversal); this.sideEffectKey = sideEffectKey; - this.getTraversal().getSideEffects().registerIfAbsent(this.sideEffectKey, HashMapSupplier.instance(), new GroupStep.GroupBiOperator<>(this.valueTraversal, this.onGraphComputer)); + this.valueTraversal = this.integrateChild(__.fold().asAdmin()); + this.preTraversal = this.integrateChild(GroupStep.splitOnBarrierStep(this.valueTraversal).get(0)); + this.getTraversal().getSideEffects().registerIfAbsent(this.sideEffectKey, HashMapSupplier.instance(), new GroupStep.GroupBiOperator<>(this.valueTraversal)); } @Override @@ -69,7 +64,8 @@ public final class GroupSideEffectStep<S, K, V> extends SideEffectStep<S> implem this.state = 'v'; } else if ('v' == this.state) { this.valueTraversal = this.integrateChild(GroupStep.convertValueTraversal(kvTraversal)); - this.getTraversal().getSideEffects().register(this.sideEffectKey, null, new GroupStep.GroupBiOperator<>(this.valueTraversal, this.onGraphComputer)); + this.preTraversal = this.integrateChild(GroupStep.splitOnBarrierStep(this.valueTraversal).get(0)); + this.getTraversal().getSideEffects().register(this.sideEffectKey, null, new GroupStep.GroupBiOperator<>(this.valueTraversal)); this.state = 'x'; } else { throw new IllegalStateException("The key and value traversals for group()-step have already been set: " + this); @@ -78,22 +74,14 @@ public final class GroupSideEffectStep<S, K, V> extends SideEffectStep<S> implem @Override protected void sideEffect(final Traverser.Admin<S> traverser) { - final Map<K, Object> map = new HashMap<>(1); - final K key = TraversalUtil.applyNullable(traverser, this.keyTraversal); - if (this.onGraphComputer) { - if (null == this.reducingBarrierStep) { - final TraverserSet traverserSet = new TraverserSet<>(); - this.preTraversal.reset(); - this.preTraversal.addStart(traverser.split()); - this.preTraversal.getEndStep().forEachRemaining(traverserSet::add); - map.put(key, traverserSet); - } else { - this.valueTraversal.reset(); - this.valueTraversal.addStart(traverser.split()); - map.put(key, (V) this.reducingBarrierStep.nextBarrier()); - } - } else - map.put(key, traverser.split()); + final Map<K, V> map = new HashMap<>(1); + final TraverserSet midTraversers = new TraverserSet<>(); + this.preTraversal.reset(); + this.preTraversal.addStart(traverser.split()); + while (this.preTraversal.hasNext()) { + midTraversers.add(this.preTraversal.getEndStep().next()); + } + map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) midTraversers); this.getTraversal().getSideEffects().add(this.sideEffectKey, map); } @@ -127,11 +115,7 @@ public final class GroupSideEffectStep<S, K, V> extends SideEffectStep<S> implem if (null != this.keyTraversal) clone.keyTraversal = this.keyTraversal.clone(); clone.valueTraversal = this.valueTraversal.clone(); - if (null != this.preTraversal) - clone.preTraversal = this.preTraversal.clone(); - final Optional<Barrier> optional = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, clone.valueTraversal); - if (optional.isPresent() && optional.get() instanceof ReducingBarrierStep) - clone.reducingBarrierStep = (ReducingBarrierStep) optional.get(); + clone.preTraversal = this.integrateChild(GroupStep.splitOnBarrierStep(clone.valueTraversal).get(0)); return clone; } @@ -153,15 +137,6 @@ public final class GroupSideEffectStep<S, K, V> extends SideEffectStep<S> implem @Override public Map<K, V> generateFinalResult(final Map<K, ?> object) { - return GroupStep.doFinalReduction((Map<K, Object>) object, this.valueTraversal, this.onGraphComputer); - } - - @Override - public void onGraphComputer() { - this.preTraversal = this.integrateChild(GroupStep.splitOnBarrierStep(this.valueTraversal).get(0)); - final Optional<Barrier> optional = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, this.valueTraversal); - if (optional.isPresent() && optional.get() instanceof ReducingBarrierStep) - this.reducingBarrierStep = (ReducingBarrierStep) optional.get(); - this.getTraversal().getSideEffects().register(this.sideEffectKey, null, new GroupStep.GroupBiOperator<>(this.valueTraversal, this.onGraphComputer = true)); + return GroupStep.doFinalReduction((Map<K, Object>) object, this.valueTraversal); } } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c890ceba/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroovyGroupTest.groovy ---------------------------------------------------------------------- diff --git a/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroovyGroupTest.groovy b/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroovyGroupTest.groovy index 619815f..ddfb94a 100644 --- a/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroovyGroupTest.groovy +++ b/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroovyGroupTest.groovy @@ -108,5 +108,10 @@ public abstract class GroovyGroupTest { public Traversal<Vertex, Map<String, Map<Object, Object>>> get_g_V_repeatXunionXoutXknowsX_groupXaX_byXageX__outXcreatedX_groupXbX_byXnameX_byXcountXX_groupXaX_byXnameXX_timesX2X_capXa_bX() { new ScriptTraversal<>(g, "gremlin-groovy", "g.V.repeat(union(out('knows').group('a').by('age'), out('created').group('b').by('name').by(count())).group('a').by('name')).times(2).cap('a', 'b')") } + + @Override + public Traversal<Vertex, Map<Long, Map<String, List<Vertex>>>> get_g_V_group_byXbothE_countX_byXgroup_byXlabelXX() { + new ScriptTraversal<>(g, "gremlin-groovy", "g.V.group().by(bothE().count).by(group.by(label))") + } } } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c890ceba/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupTest.java ---------------------------------------------------------------------- diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupTest.java index d844ba7..d4c4d74 100644 --- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupTest.java +++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupTest.java @@ -84,6 +84,8 @@ public abstract class GroupTest extends AbstractGremlinProcessTest { public abstract Traversal<Vertex, Map<String, Map<Object, Object>>> get_g_V_repeatXunionXoutXknowsX_groupXaX_byXageX__outXcreatedX_groupXbX_byXnameX_byXcountXX_groupXaX_byXnameXX_timesX2X_capXa_bX(); + public abstract Traversal<Vertex, Map<Long, Map<String, List<Vertex>>>> get_g_V_group_byXbothE_countX_byXgroup_byXlabelXX(); + @Test @LoadGraphWith(MODERN) public void g_V_group_byXnameX() { @@ -356,6 +358,41 @@ public abstract class GroupTest extends AbstractGremlinProcessTest { checkSideEffects(traversal.asAdmin().getSideEffects(), "a", HashMap.class, "b", HashMap.class); } + @Test + @LoadGraphWith(MODERN) + public void g_V_group_byXbothE_countX_byXgroup_byXlabelXX() { + final Traversal<Vertex, Map<Long, Map<String, List<Vertex>>>> traversal = get_g_V_group_byXbothE_countX_byXgroup_byXlabelXX(); + final Map<Long, Map<String, List<Vertex>>> map = traversal.next(); + assertFalse(traversal.hasNext()); + assertEquals(2, map.size()); + assertTrue(map.containsKey(1l)); + assertTrue(map.containsKey(3l)); + // + Map<String, List<Vertex>> submap = map.get(1l); + assertEquals(2, submap.size()); + assertTrue(submap.containsKey("software")); + assertTrue(submap.containsKey("person")); + List<Vertex> list = submap.get("software"); + assertEquals(1, list.size()); + assertEquals(convertToVertex(graph, "ripple"), list.get(0)); + list = submap.get("person"); + assertEquals(2, list.size()); + assertTrue(list.contains(convertToVertex(graph, "vadas"))); + assertTrue(list.contains(convertToVertex(graph, "peter"))); + // + submap = map.get(3l); + assertEquals(2, submap.size()); + assertTrue(submap.containsKey("software")); + assertTrue(submap.containsKey("person")); + list = submap.get("software"); + assertEquals(1, list.size()); + assertEquals(convertToVertex(graph, "lop"), list.get(0)); + list = submap.get("person"); + assertEquals(2, list.size()); + assertTrue(list.contains(convertToVertex(graph, "marko"))); + assertTrue(list.contains(convertToVertex(graph, "josh"))); + } + public static class Traversals extends GroupTest { @Override @@ -437,5 +474,10 @@ public abstract class GroupTest extends AbstractGremlinProcessTest { public Traversal<Vertex, Map<String, Map<Object, Object>>> get_g_V_repeatXunionXoutXknowsX_groupXaX_byXageX__outXcreatedX_groupXbX_byXnameX_byXcountXX_groupXaX_byXnameXX_timesX2X_capXa_bX() { return g.V().repeat(__.union(__.out("knows").group("a").by("age"), __.out("created").group("b").by("name").by(count())).group("a").by("name")).times(2).cap("a", "b"); } + + @Override + public Traversal<Vertex, Map<Long, Map<String, List<Vertex>>>> get_g_V_group_byXbothE_countX_byXgroup_byXlabelXX() { + return g.V().<Long, Map<String, List<Vertex>>>group().by(__.bothE().count()).by(__.group().by(T.label)); + } } } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c890ceba/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java index 768d10a..5c6d729 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java @@ -126,7 +126,6 @@ public final class SparkStarBarrierInterceptor implements SparkVertexProgramInte } }).fold(endStep.getSeedSupplier().get(), biOperator::apply); } else if (endStep instanceof GroupStep) { - ((GroupStep) endStep).onGraphComputer(); final GroupStep.GroupBiOperator<Object, Object> biOperator = (GroupStep.GroupBiOperator) endStep.getBiOperator(); result = ((GroupStep) endStep).generateFinalResult(nextRDD. mapPartitions(partitions -> { http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c890ceba/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerGraphPlayTest.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerGraphPlayTest.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerGraphPlayTest.java index e339519..5557716 100644 --- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerGraphPlayTest.java +++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerGraphPlayTest.java @@ -86,11 +86,11 @@ public class TinkerGraphPlayTest { @Ignore public void benchmarkGroup() throws Exception { Graph graph = TinkerGraph.open(); - GraphTraversalSource g = graph.traversal().withComputer(); + GraphTraversalSource g = graph.traversal(); graph.io(GraphMLIo.build()).readGraph("../data/grateful-dead.xml"); ///////// - g.V().group().by(T.label).by(values("name")).forEachRemaining(x -> logger.info(x.toString())); + //g.V().group().by(T.label).by(values("name")).forEachRemaining(x -> logger.info(x.toString())); System.out.println("group: " + g.V().both("followedBy").both("followedBy").group().by("songType").by(count()).next()); System.out.println("groupV3d0: " + g.V().both("followedBy").both("followedBy").groupV3d0().by("songType").by().by(__.count(Scope.local)).next());
