Repository: tinkerpop Updated Branches: refs/heads/TINKERPOP-1606 [created] f53327b7f
Got a super simple implementation of GroupStep working. Tada. I think I can get it even simpler too.... Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/f53327b7 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/f53327b7 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/f53327b7 Branch: refs/heads/TINKERPOP-1606 Commit: f53327b7fe7ba927a04400739d30b9f28ec85ac2 Parents: d1f0889 Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Wed Jan 18 14:17:35 2017 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Wed Jan 18 14:17:35 2017 -0700 ---------------------------------------------------------------------- .../process/traversal/step/map/GroupStep.java | 191 ++++--------------- .../step/sideEffect/GroupSideEffectStep.java | 20 +- 2 files changed, 49 insertions(+), 162 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f53327b7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java index d6ce421..4d229d1 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java @@ -31,7 +31,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating; import org.apache.tinkerpop.gremlin.process.traversal.step.LambdaHolder; import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent; -import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.CollectingBarrierStep; import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep; import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; @@ -40,7 +40,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; -import org.javatuples.Pair; import java.io.IOException; import java.io.ObjectInputStream; @@ -90,15 +89,19 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> public Map<K, V> projectTraverser(final Traverser.Admin<S> traverser) { final Map<K, V> map = new HashMap<>(1); if (null == this.preTraversal) { - map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverser); + map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) new TraverserSet<>(traverser)); } else { - final TraverserSet traverserSet = new TraverserSet<>(); this.preTraversal.reset(); this.preTraversal.addStart(traverser); - while (this.preTraversal.hasNext()) { - traverserSet.add(this.preTraversal.nextTraverser()); - } - map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverserSet); + final Barrier barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, this.preTraversal).orElse(null); + if (null == barrierStep) { + final TraverserSet set = new TraverserSet(); + while (this.preTraversal.hasNext()) { + set.add(this.preTraversal.nextTraverser()); + } + map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) set); + } else if (barrierStep.hasNextBarrier()) + map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) barrierStep.nextBarrier()); } return map; } @@ -160,20 +163,18 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> public static final class GroupBiOperator<K, V> implements BinaryOperator<Map<K, V>>, Serializable, Cloneable { - // size limit before Barrier.processAllStarts() to lazy reduce - private static final int SIZE_LIMIT = 1000; - - private Traversal.Admin<?, V> valueTraversal; private Barrier barrierStep; public GroupBiOperator(final Traversal.Admin<?, V> valueTraversal) { // if there is a lambda that can not be serialized, then simply use TraverserSets - if (TraversalHelper.hasStepOfAssignableClassRecursively(LambdaHolder.class, valueTraversal)) { - this.valueTraversal = null; + if (TraversalHelper.hasStepOfAssignableClassRecursively(LambdaHolder.class, valueTraversal)) this.barrierStep = null; - } else { - this.valueTraversal = valueTraversal.clone(); - this.barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, this.valueTraversal).orElse(null); + else { + this.barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, valueTraversal.clone()).orElse(null); + if (this.barrierStep instanceof CollectingBarrierStep) + this.barrierStep = null; + if (null != this.barrierStep) + this.barrierStep = (Barrier) ((Step) this.barrierStep).clone(); } } @@ -185,10 +186,8 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> public GroupBiOperator<K, V> clone() { try { final GroupBiOperator<K, V> clone = (GroupBiOperator<K, V>) super.clone(); - if (null != this.valueTraversal) { - clone.valueTraversal = this.valueTraversal.clone(); - clone.barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, clone.valueTraversal).orElse(null); - } + if (null != this.barrierStep) + clone.barrierStep = (Barrier) ((Step) this.barrierStep).clone(); return clone; } catch (final CloneNotSupportedException e) { throw new IllegalStateException(e.getMessage(), e); @@ -200,120 +199,17 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> for (final K key : mapB.keySet()) { Object objectA = mapA.get(key); final Object objectB = mapB.get(key); - assert null != objectB; if (null == objectA) { objectA = objectB; + } else if (null == objectB) { + } else { - // TRAVERSER - if (objectA instanceof Traverser.Admin) { - if (objectB instanceof Traverser.Admin) { - final TraverserSet set = new TraverserSet(); - set.add((Traverser.Admin) objectA); - set.add((Traverser.Admin) objectB); - objectA = set; - } else if (objectB instanceof TraverserSet) { - final TraverserSet set = (TraverserSet) objectB; - set.add((Traverser.Admin) objectA); - if (null != this.barrierStep && set.size() > SIZE_LIMIT) { - this.valueTraversal.reset(); - ((Step) this.barrierStep).addStarts(set.iterator()); - objectA = this.barrierStep.nextBarrier(); - } else - objectA = objectB; - } else if (objectB instanceof Pair) { - final TraverserSet set = (TraverserSet) ((Pair) objectB).getValue0(); - set.add((Traverser.Admin) objectA); - if (set.size() > SIZE_LIMIT) { // barrier step can never be null -- no need to check - this.valueTraversal.reset(); - ((Step) this.barrierStep).addStarts(set.iterator()); - this.barrierStep.addBarrier(((Pair) objectB).getValue1()); - objectA = this.barrierStep.nextBarrier(); - } else - objectA = Pair.with(set, ((Pair) objectB).getValue1()); - } else - objectA = Pair.with(new TraverserSet((Traverser.Admin) objectA), objectB); - // TRAVERSER SET - } else if (objectA instanceof TraverserSet) { - if (objectB instanceof Traverser.Admin) { - final TraverserSet set = (TraverserSet) objectA; - set.add((Traverser.Admin) objectB); - if (null != this.barrierStep && set.size() > SIZE_LIMIT) { - this.valueTraversal.reset(); - ((Step) this.barrierStep).addStarts(set.iterator()); - objectA = this.barrierStep.nextBarrier(); - } - } else if (objectB instanceof TraverserSet) { - final TraverserSet set = (TraverserSet) objectA; - set.addAll((TraverserSet) objectB); - if (null != this.barrierStep && set.size() > SIZE_LIMIT) { - this.valueTraversal.reset(); - ((Step) this.barrierStep).addStarts(set.iterator()); - objectA = this.barrierStep.nextBarrier(); - } - } else if (objectB instanceof Pair) { - final TraverserSet set = (TraverserSet) objectA; - set.addAll((TraverserSet) ((Pair) objectB).getValue0()); - if (set.size() > SIZE_LIMIT) { // barrier step can never be null -- no need to check - this.valueTraversal.reset(); - ((Step) this.barrierStep).addStarts(set.iterator()); - this.barrierStep.addBarrier(((Pair) objectB).getValue1()); - objectA = this.barrierStep.nextBarrier(); - } else - objectA = Pair.with(set, ((Pair) objectB).getValue1()); - } else - objectA = Pair.with(objectA, objectB); - // TRAVERSER SET + BARRIER - } else if (objectA instanceof Pair) { - if (objectB instanceof Traverser.Admin) { - final TraverserSet set = ((TraverserSet) ((Pair) objectA).getValue0()); - set.add((Traverser.Admin) objectB); - if (set.size() > SIZE_LIMIT) { // barrier step can never be null -- no need to check - this.valueTraversal.reset(); - ((Step) this.barrierStep).addStarts(set.iterator()); - this.barrierStep.addBarrier(((Pair) objectA).getValue1()); - objectA = this.barrierStep.nextBarrier(); - } - } else if (objectB instanceof TraverserSet) { - final TraverserSet set = (TraverserSet) ((Pair) objectA).getValue0(); - set.addAll((TraverserSet) objectB); - if (set.size() > SIZE_LIMIT) { // barrier step can never be null -- no need to check - this.valueTraversal.reset(); - ((Step) this.barrierStep).addStarts(set.iterator()); - this.barrierStep.addBarrier(((Pair) objectA).getValue1()); - objectA = this.barrierStep.nextBarrier(); - } - } else if (objectB instanceof Pair) { - this.valueTraversal.reset(); - this.barrierStep.addBarrier(((Pair) objectA).getValue1()); - this.barrierStep.addBarrier(((Pair) objectB).getValue1()); - ((Step) this.barrierStep).addStarts(((TraverserSet) ((Pair) objectA).getValue0()).iterator()); - ((Step) this.barrierStep).addStarts(((TraverserSet) ((Pair) objectB).getValue0()).iterator()); - objectA = this.barrierStep.nextBarrier(); - } else { - this.valueTraversal.reset(); - this.barrierStep.addBarrier(((Pair) objectA).getValue1()); - this.barrierStep.addBarrier(objectB); - ((Step) this.barrierStep).addStarts(((TraverserSet) ((Pair) objectA).getValue0()).iterator()); - objectA = this.barrierStep.nextBarrier(); - } - // BARRIER + if (null == this.barrierStep) { + ((TraverserSet) objectA).addAll((TraverserSet) objectB); } else { - if (objectB instanceof Traverser.Admin) { - objectA = Pair.with(new TraverserSet<>((Traverser.Admin) objectB), objectA); - } else if (objectB instanceof TraverserSet) { - objectA = Pair.with(objectB, objectA); - } else if (objectB instanceof Pair) { - this.valueTraversal.reset(); - this.barrierStep.addBarrier(objectA); - this.barrierStep.addBarrier(((Pair) objectB).getValue1()); - ((Step) this.barrierStep).addStarts(((TraverserSet) ((Pair) objectB).getValue0()).iterator()); - objectA = this.barrierStep.nextBarrier(); - } else { - this.valueTraversal.reset(); - this.barrierStep.addBarrier(objectA); - this.barrierStep.addBarrier(objectB); - objectA = this.barrierStep.nextBarrier(); - } + this.barrierStep.addBarrier(objectA); + this.barrierStep.addBarrier(objectB); + objectA = this.barrierStep.nextBarrier(); } } mapA.put(key, (V) objectA); @@ -321,16 +217,12 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> return mapA; } - // necessary to control Java Serialization to ensure proper clearing of internal traverser data private void writeObject(final ObjectOutputStream outputStream) throws IOException { - // necessary as a non-root child is being sent over the wire - if (null != this.valueTraversal) this.valueTraversal.setParent(EmptyStep.instance()); - outputStream.writeObject(null == this.valueTraversal ? null : this.valueTraversal.clone()); // todo: reset() instead? + outputStream.writeObject(this.barrierStep); // todo: reset() instead? } private void readObject(final ObjectInputStream inputStream) throws IOException, ClassNotFoundException { - this.valueTraversal = (Traversal.Admin<?, V>) inputStream.readObject(); - this.barrierStep = null == this.valueTraversal ? null : TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, this.valueTraversal).orElse(null); + this.barrierStep = (Barrier) inputStream.readObject(); } } @@ -354,35 +246,24 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> final Traversal.Admin<?, ?> first = __.identity().asAdmin(); boolean updated = false; for (final Step step : valueTraversal.getSteps()) { - if (step instanceof Barrier) - break; first.addStep(step.clone()); updated = true; + if (step instanceof Barrier) + break; + } return updated ? first : null; } public static <K, V> Map<K, V> doFinalReduction(final Map<K, Object> map, final Traversal.Admin<?, V> valueTraversal) { final Map<K, V> reducedMap = new HashMap<>(map.size()); - final Barrier reducingBarrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, valueTraversal).orElse(null); + final Barrier barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, valueTraversal).orElse(null); IteratorUtils.removeOnNext(map.entrySet().iterator()).forEachRemaining(entry -> { - if (null == reducingBarrierStep) { - if (entry.getValue() instanceof TraverserSet) { - if (!((TraverserSet) entry.getValue()).isEmpty()) - reducedMap.put(entry.getKey(), ((TraverserSet<V>) entry.getValue()).peek().get()); - } else - reducedMap.put(entry.getKey(), (V) entry.getValue()); + if (null == barrierStep) { + reducedMap.put(entry.getKey(), ((TraverserSet<V>) entry.getValue()).peek().get()); } else { valueTraversal.reset(); - if (entry.getValue() instanceof Traverser.Admin) - ((Step) reducingBarrierStep).addStart((Traverser.Admin) entry.getValue()); - else if (entry.getValue() instanceof TraverserSet) - ((Step) reducingBarrierStep).addStarts(((TraverserSet) entry.getValue()).iterator()); - else if (entry.getValue() instanceof Pair) { - ((Step) reducingBarrierStep).addStarts(((TraverserSet) (((Pair) entry.getValue()).getValue0())).iterator()); - reducingBarrierStep.addBarrier((((Pair) entry.getValue()).getValue1())); - } else - reducingBarrierStep.addBarrier(entry.getValue()); + barrierStep.addBarrier(entry.getValue()); if (valueTraversal.hasNext()) reducedMap.put(entry.getKey(), valueTraversal.next()); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f53327b7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java index 0e8a4f5..bba795b 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java @@ -21,12 +21,14 @@ package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; +import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating; import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable; import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent; import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStep; import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier; @@ -76,15 +78,19 @@ public final class GroupSideEffectStep<S, K, V> extends SideEffectStep<S> implem protected void sideEffect(final Traverser.Admin<S> traverser) { final Map<K, V> map = new HashMap<>(1); if (null == this.preTraversal) { - map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverser.split()); + map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) new TraverserSet<>(traverser)); } else { - final TraverserSet traverserSet = new TraverserSet<>(); this.preTraversal.reset(); - this.preTraversal.addStart(traverser.split()); - while(this.preTraversal.hasNext()) { - traverserSet.add(this.preTraversal.nextTraverser()); - } - map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverserSet); + this.preTraversal.addStart(traverser); + final Barrier barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, this.preTraversal).orElse(null); + if (null == barrierStep) { + final TraverserSet traverserSet = new TraverserSet(); + while (this.preTraversal.hasNext()) { + traverserSet.add(this.preTraversal.nextTraverser()); + } + map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverserSet); + } else if (barrierStep.hasNextBarrier()) + map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) barrierStep.nextBarrier()); } this.getTraversal().getSideEffects().add(this.sideEffectKey, map); }