Repository: tinkerpop Updated Branches: refs/heads/TINKERPOP-1564 b87022bf5 -> b81dbc3b7
being a bit smarter about DedupGlobalStep. If you are at a worker, simply filter. This may (not always) limit data flow. Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/b81dbc3b Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/b81dbc3b Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/b81dbc3b Branch: refs/heads/TINKERPOP-1564 Commit: b81dbc3b7ba0bada9203aa03e307cd093e758de4 Parents: b87022b Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Tue Jan 24 16:00:09 2017 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Tue Jan 24 16:00:09 2017 -0700 ---------------------------------------------------------------------- .../gremlin/akka/process/actors/AkkaGraphActors.java | 6 +++--- .../gremlin/akka/process/actors/AkkaPlayTest.java | 10 ++++++---- .../process/traversal/step/filter/DedupGlobalStep.java | 6 +++--- .../process/actors/TestSetupTerminateActorProgram.java | 9 ++++----- 4 files changed, 16 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b81dbc3b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java index 26f27c1..125cae4 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java @@ -95,9 +95,9 @@ public final class AkkaGraphActors<R> implements GraphActors<R> { withDeploy(new Deploy(new RemoteScope(AkkaConfigFactory.getMasterActorDeployment(finalConfiguration)))), "master"); return FutureConverters.<ActorsResult<R>>toJava((scala.concurrent.Future) Patterns.ask(master, new DefaultActorsResult<>(), 10000000)). - thenApply(x -> { - ((ActorsResult) x).setRuntime(System.currentTimeMillis() - startTime); - return x; + thenApply(actorsResult -> { + ((ActorsResult) actorsResult).setRuntime(System.currentTimeMillis() - startTime); + return actorsResult; }).toCompletableFuture(); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b81dbc3b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java index 32b543c..ffa9f5b 100644 --- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java +++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java @@ -31,6 +31,8 @@ import org.junit.Test; import java.util.Map; +import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.dedup; + /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ @@ -45,19 +47,19 @@ public class AkkaPlayTest { configuration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo"); final Graph graph = TinkerGraph.open(configuration); //graph.io(GryoIo.build()).readGraph("../data/tinkerpop-modern.kryo"); - GraphTraversalSource g = graph.traversal().withProcessor(GraphActors.open(AkkaGraphActors.class).workers(15)); + GraphTraversalSource g = graph.traversal().withProcessor(GraphActors.open(AkkaGraphActors.class).workers(3)); // System.out.println(g.V().group().by("name").by(outE().values("weight").fold()).toList()); // [{v[1]=6, v[2]=2, v[3]=6, v[4]=6, v[5]=2, v[6]=2}] System.out.println(g.V().both().groupCount("a").out().cap("a").select(Column.keys).unfold().both().groupCount("a").cap("a").toList()); System.out.println(g.V().both().groupCount("a").out().cap("a").select(Column.keys).unfold().both().groupCount("a").cap("a").toList()); for (int i = 0; i < 1000; i++) { - Map<Object, Long> map = g.V().both().groupCount("a").out().cap("a").select(Column.keys).unfold().both().groupCount("a").<Map>cap("a").next(); + /*Map<Object, Long> map = g.V().both().groupCount("a").out().cap("a").select(Column.keys).unfold().both().groupCount("a").<Map>cap("a").next(); if (24L != map.values().stream().reduce((a, b) -> a + b).get()) { System.out.println(i + " -- " + map); assert false; - } - //assert 0L == g.V().repeat(dedup()).times(2).count().next(); + }*/ + assert 0L == g.V().repeat(dedup()).times(2).count().next(); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b81dbc3b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java index 8ccccfd..7724dae 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java @@ -59,7 +59,7 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal private Set<String> keepLabels; private Map<Object, Traverser.Admin<S>> barrier; private Iterator<Map.Entry<Object, Traverser.Admin<S>>> barrierIterator; - private boolean atWorker = true; + private boolean atWorker = false; private boolean pushBased = false; public DedupGlobalStep(final Traversal.Admin traversal, final String... dedupLabels) { @@ -69,7 +69,7 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal @Override protected boolean filter(final Traverser.Admin<S> traverser) { - if (this.pushBased && this.atWorker) return true; + if (this.pushBased && this.atWorker) return false; traverser.setBulk(1); if (null == this.dedupLabels) { return this.duplicateSet.add(TraversalUtil.applyNullable(traverser, this.dedupTraversal)); @@ -186,7 +186,7 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal } else { object = TraversalUtil.applyNullable(traverser, this.dedupTraversal); } - if (this.duplicateSet.add(object)) { + if (this.duplicateSet.add(object) && !map.containsKey(object)) { traverser.setBulk(1L); // traverser.detach(); traverser.set(DetachedFactory.detach(traverser.get(), true)); // TODO: detect required detachment accordingly http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b81dbc3b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java index 58d2d50..cffc61a 100644 --- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java +++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java @@ -35,7 +35,6 @@ class TestSetupTerminateActorProgram implements ActorProgram<List<Integer>> { private static final String WORKER_SETUP = "workerSetup"; private static final String WORKER_TERMINATE = "workerTerminate"; private static final String MASTER_SETUP = "masterSetup"; - private static final String MASTER_TERMINATE = "masterTerminate"; @Override public Worker createWorkerProgram(final Actor.Worker worker) { @@ -53,7 +52,7 @@ class TestSetupTerminateActorProgram implements ActorProgram<List<Integer>> { @Override public void execute(final String message) { - fail("The worker should not have received any messages"); + fail("The worker should not have received any messages: " + message); } @Override @@ -65,7 +64,7 @@ class TestSetupTerminateActorProgram implements ActorProgram<List<Integer>> { } @Override - public Master createMasterProgram(Actor.Master master) { + public Master createMasterProgram(final Actor.Master master) { return new Master<String>() { private int masterSetup = 0; private int masterTerminate = 0; @@ -108,12 +107,12 @@ class TestSetupTerminateActorProgram implements ActorProgram<List<Integer>> { assertEquals(this.workerSetup, this.workerTerminate); assertEquals(this.workerSetup, master.workers().size()); assertEquals(1, this.masterSetup); - assertEquals(0, this.masterTerminate); + assertEquals(0, this.masterTerminate++); master.setResult(Arrays.asList( this.workerSetup, this.workerTerminate, this.masterSetup, - ++this.masterTerminate)); + this.masterTerminate)); } }; }