created a new module called akka-gremlin. It will implement the actor/ package 
interfaces (yet to be defined in gremlin-core).


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/93d58caf
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/93d58caf
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/93d58caf

Branch: refs/heads/TINKERPOP-1564
Commit: 93d58caff76dea3fb1f668ff4e50f1b4ce2e904b
Parents: d4dcae2
Author: Marko A. Rodriguez <[email protected]>
Authored: Tue Dec 13 05:08:56 2016 -0700
Committer: Marko A. Rodriguez <[email protected]>
Committed: Wed Jan 4 05:07:59 2017 -0700

----------------------------------------------------------------------
 akka-gremlin/pom.xml                            |  82 ++++++++
 .../gremlin/akka/process/actor/AkkaActors.java  |  53 +++++
 .../process/actor/MasterTraversalActor.java     | 176 +++++++++++++++++
 .../akka/process/actor/TraverserMailbox.java    | 132 +++++++++++++
 .../process/actor/WorkerTraversalActor.java     | 192 +++++++++++++++++++
 .../actor/WorkerTraversalSideEffects.java       | 147 ++++++++++++++
 .../actor/message/BarrierAddMessage.java        |  47 +++++
 .../actor/message/BarrierDoneMessage.java       |  41 ++++
 .../actor/message/SideEffectAddMessage.java     |  43 +++++
 .../process/actor/message/StartMessage.java     |  35 ++++
 .../actor/message/VoteToHaltMessage.java        |  36 ++++
 .../process/traversal/step/map/ActorStep.java   |  76 ++++++++
 .../strategy/decoration/ActorStrategy.java      |  83 ++++++++
 .../verification/ActorVerificationStrategy.java |  60 ++++++
 .../src/main/resources/application.conf         |  11 ++
 .../process/AkkaActorsProcessStandardTest.java  |  33 ++++
 .../akka/process/AkkaActorsProvider.java        | 140 ++++++++++++++
 .../gremlin/akka/process/AkkaPlayTest.java      |  82 ++++++++
 .../step/util/CollectingBarrierStep.java        |   7 +-
 .../tinkerpop/gremlin/structure/Graph.java      |  12 +-
 .../tinkerpop/gremlin/structure/Partition.java  |  73 +++++++
 .../gremlin/structure/Partitioner.java          |  33 ++++
 .../util/partitioner/GlobalPartitioner.java     |  84 ++++++++
 .../util/partitioner/HashPartitioner.java       |  96 ++++++++++
 pom.xml                                         |   2 +
 25 files changed, 1773 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/akka-gremlin/pom.xml b/akka-gremlin/pom.xml
new file mode 100644
index 0000000..e9f5345
--- /dev/null
+++ b/akka-gremlin/pom.xml
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~  Licensed to the Apache Software Foundation (ASF) under one
+  ~  or more contributor license agreements.  See the NOTICE file
+  ~  distributed with this work for additional information
+  ~  regarding copyright ownership.  The ASF licenses this file
+  ~  to you under the Apache License, Version 2.0 (the
+  ~  "License"); you may not use this file except in compliance
+  ~  with the License.  You may obtain a copy of the License at
+  ~
+  ~  http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing,
+  ~  software distributed under the License is distributed on an
+  ~  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~  KIND, either express or implied.  See the License for the
+  ~  specific language governing permissions and limitations
+  ~  under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>tinkerpop</artifactId>
+        <groupId>org.apache.tinkerpop</groupId>
+        <version>3.3.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>akka-gremlin</artifactId>
+    <name>Apache TinkerPop :: Akka Gremlin</name>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>gremlin-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-actor_2.11</artifactId>
+            <version>2.4.14</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>scala-library</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>2.11.8</version>
+        </dependency>
+        <!-- TEST -->
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>gremlin-test</artifactId>
+            <version>3.3.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>tinkergraph-gremlin</artifactId>
+            <version>3.3.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <directory>${basedir}/target</directory>
+        <finalName>${project.artifactId}-${project.version}</finalName>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
----------------------------------------------------------------------
diff --git 
a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
new file mode 100644
index 0000000..aa2a048
--- /dev/null
+++ 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
@@ -0,0 +1,53 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actor;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class AkkaActors<S, E> {
+
+    public final ActorSystem system;
+    private TraverserSet<E> results = new TraverserSet<>();
+
+    public AkkaActors(final Traversal.Admin<S, E> traversal, final Partitioner 
partitioner) {
+        this.system = ActorSystem.create("traversal-" + traversal.hashCode());
+        this.system.actorOf(Props.create(MasterTraversalActor.class, 
traversal.clone(), partitioner, this.results), "master");
+    }
+
+    public Future<TraverserSet<E>> getResults() {
+        return CompletableFuture.supplyAsync(() -> {
+            while (!this.system.isTerminated()) {
+
+            }
+            return this.results;
+        });
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java
----------------------------------------------------------------------
diff --git 
a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java
 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java
new file mode 100644
index 0000000..d1a29d8
--- /dev/null
+++ 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java
@@ -0,0 +1,176 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actor;
+
+import akka.actor.AbstractActor;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Props;
+import akka.dispatch.RequiresMessageQueue;
+import akka.japi.pf.ReceiveBuilder;
+import 
org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierAddMessage;
+import 
org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierDoneMessage;
+import 
org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.akka.process.actor.message.StartMessage;
+import 
org.apache.tinkerpop.gremlin.akka.process.actor.message.VoteToHaltMessage;
+import 
org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.verification.ActorVerificationStrategy;
+import 
org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ComputerVerificationStrategy;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.StandardVerificationStrategy;
+import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class MasterTraversalActor extends AbstractActor implements 
RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics> {
+
+    private final Traversal.Admin<?, ?> traversal;
+    private final TraversalMatrix<?, ?> matrix;
+    private final Partitioner partitioner;
+    private final Map<String, ActorSelection> workers = new HashMap<>();
+    private Map<String, Barrier> barriers = new HashMap<>();
+    private final TraverserSet<?> results;
+    private final String leaderWorker;
+
+    public MasterTraversalActor(final Traversal.Admin<?, ?> traversal, final 
Partitioner partitioner, final TraverserSet<?> results) {
+        System.out.println("master[created]: " + self().path());
+        final TraversalStrategies strategies = 
traversal.getStrategies().clone();
+        strategies.removeStrategies(ComputerVerificationStrategy.class, 
StandardVerificationStrategy.class);
+        strategies.addStrategies(ActorVerificationStrategy.instance());
+        traversal.setStrategies(strategies);
+        traversal.applyStrategies();
+
+        this.traversal = ((TraversalVertexProgramStep) 
traversal.getStartStep()).computerTraversal.get();
+        System.out.println(this.traversal);
+        this.matrix = new TraversalMatrix<>(this.traversal);
+        this.partitioner = partitioner;
+        this.results = results;
+        this.initializeWorkers();
+        this.leaderWorker = "worker-" + 
this.partitioner.getPartitions().get(0).hashCode();
+
+        receive(ReceiveBuilder.
+                match(Traverser.Admin.class, traverser -> {
+                    this.processTraverser(traverser);
+                }).
+                match(BarrierAddMessage.class, barrierMerge -> {
+                    // get the barrier updates from the workers to synchronize 
against the master barrier
+                    final Barrier barrier = (Barrier) 
this.matrix.getStepById(barrierMerge.getStepId());
+                    final Step<?, ?> step = (Step) barrier;
+                    GraphComputing.atMaster(step, true);
+                    barrier.addBarrier(barrierMerge.getBarrier());
+                    this.barriers.put(step.getId(), barrier);
+                }).
+                match(SideEffectAddMessage.class, sideEffect -> {
+                    // get the side-effect updates from the workers to 
generate the master side-effects
+                    
this.traversal.getSideEffects().add(sideEffect.getSideEffectKey(), 
sideEffect.getSideEffectValue());
+                }).
+                match(VoteToHaltMessage.class, voteToHalt -> {
+                    assert !sender().equals(self());
+                    if (!this.barriers.isEmpty()) {
+                        for (final Barrier barrier : this.barriers.values()) {
+                            final Step<?, ?> step = (Step) barrier;
+                            if (!(barrier instanceof LocalBarrier)) {
+                                while (step.hasNext()) {
+                                    this.sendTraverser(step.next());
+                                }
+                            } else {
+                                this.traversal.getSideEffects().forEach((k, v) 
-> {
+                                    this.broadcast(new SideEffectAddMessage(k, 
v));
+                                });
+                                this.broadcast(new 
BarrierDoneMessage(barrier));
+                                barrier.done();
+                            }
+                        }
+                        this.barriers.clear();
+                        
worker(this.leaderWorker).tell(StartMessage.instance(), self());
+                    } else {
+                        while (this.traversal.hasNext()) {
+                            this.results.add((Traverser.Admin) 
this.traversal.nextTraverser());
+                        }
+                        context().system().terminate();
+                    }
+                }).build());
+    }
+
+    private void initializeWorkers() {
+        final List<Partition> partitions = this.partitioner.getPartitions();
+        for (final Partition partition : partitions) {
+            final String workerPathString = "worker-" + partition.hashCode();
+            final ActorRef worker = 
context().actorOf(Props.create(WorkerTraversalActor.class, 
this.traversal.clone(), partition, this.partitioner), workerPathString);
+            this.workers.put(workerPathString, 
context().actorSelection(worker.path()));
+        }
+        for (final ActorSelection worker : this.workers.values()) {
+            worker.tell(StartMessage.instance(), self());
+        }
+        this.workers.clear();
+    }
+
+    private void broadcast(final Object message) {
+        for (final Partition partition : this.partitioner.getPartitions()) {
+            worker("worker-" + partition.hashCode()).tell(message, self());
+        }
+    }
+
+    private void processTraverser(final Traverser.Admin traverser) {
+        if (traverser.isHalted() || traverser.get() instanceof Element) {
+            this.sendTraverser(traverser);
+        } else {
+            final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, 
Object>>getStepById(traverser.getStepId());
+            GraphComputing.atMaster(step, true);
+            step.addStart(traverser);
+            while (step.hasNext()) {
+                this.processTraverser(step.next());
+            }
+        }
+    }
+
+    private void sendTraverser(final Traverser.Admin traverser) {
+        if (traverser.isHalted())
+            this.results.add(traverser);
+        else if (traverser.get() instanceof Element)
+            worker("worker-" + this.partitioner.getPartition((Element) 
traverser.get()).hashCode()).tell(traverser, self());
+        else
+            self().tell(traverser, self());
+    }
+
+    private ActorSelection worker(final String workerPath) {
+        ActorSelection worker = this.workers.get(workerPath);
+        if (null == worker) {
+            worker = context().actorSelection(workerPath);
+            this.workers.put(workerPath, worker);
+        }
+        return worker;
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
----------------------------------------------------------------------
diff --git 
a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
new file mode 100644
index 0000000..6a6c0f4
--- /dev/null
+++ 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
@@ -0,0 +1,132 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.Envelope;
+import akka.dispatch.MailboxType;
+import akka.dispatch.MessageQueue;
+import akka.dispatch.ProducesMessageQueue;
+import com.typesafe.config.Config;
+import 
org.apache.tinkerpop.gremlin.akka.process.actor.message.VoteToHaltMessage;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import scala.Option;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class TraverserMailbox implements MailboxType, 
ProducesMessageQueue<TraverserMailbox.TraverserMessageQueue> {
+
+    public static class TraverserMessageQueue implements MessageQueue, 
TraverserSetSemantics {
+        private final Queue<Envelope> otherMessages = new LinkedList<>();
+        private final TraverserSet<?> traverserMessages = new TraverserSet<>();
+        private Envelope haltMessage = null;
+        private Envelope terminateToken = null;
+        private final ActorRef owner;
+        private final Object MUTEX = new Object();
+
+        public TraverserMessageQueue(final ActorRef owner) {
+            this.owner = owner;
+        }
+
+        public void enqueue(final ActorRef receiver, final Envelope handle) {
+            synchronized (MUTEX) {
+                if (handle.message() instanceof Traverser.Admin)
+                    this.traverserMessages.offer((Traverser.Admin) 
handle.message());
+                else if (handle.message() instanceof VoteToHaltMessage) {
+                    assert null == this.haltMessage;
+                    this.haltMessage = handle;
+                } else if (handle.message() instanceof 
WorkerTraversalActor.Terminate) {
+                    assert null == this.terminateToken;
+                    this.terminateToken = handle;
+                } else
+                    this.otherMessages.offer(handle);
+            }
+        }
+
+        public Envelope dequeue() {
+            synchronized (MUTEX) {
+                if (!this.otherMessages.isEmpty())
+                    return this.otherMessages.poll();
+                if (!this.traverserMessages.isEmpty())
+                    return new Envelope(this.traverserMessages.poll(), 
this.owner);
+                else if (null != this.terminateToken) {
+                    final Envelope temp = this.terminateToken;
+                    this.terminateToken = null;
+                    return temp;
+                } else {
+                    final Envelope temp = this.haltMessage;
+                    this.haltMessage = null;
+                    return temp;
+                }
+            }
+        }
+
+        public int numberOfMessages() {
+            synchronized (MUTEX) {
+                return this.otherMessages.size() + 
this.traverserMessages.size() + (null == this.haltMessage ? 0 : 1) + (null == 
this.terminateToken ? 0 : 1);
+            }
+        }
+
+        public boolean hasMessages() {
+            synchronized (MUTEX) {
+                return !this.otherMessages.isEmpty() || 
!this.traverserMessages.isEmpty() || null != this.haltMessage || 
this.terminateToken != null;
+            }
+        }
+
+        public void cleanUp(final ActorRef owner, final MessageQueue 
deadLetters) {
+            synchronized (MUTEX) {
+                for (final Envelope handle : this.otherMessages) {
+                    deadLetters.enqueue(owner, handle);
+                }
+                for (final Traverser.Admin<?> traverser : 
this.traverserMessages) {
+                    deadLetters.enqueue(owner, new Envelope(traverser, 
this.owner));
+                }
+                if (null != this.haltMessage) {
+                    deadLetters.enqueue(owner, this.haltMessage);
+                    this.haltMessage = null;
+                }
+                if (null != this.terminateToken) {
+                    deadLetters.enqueue(owner, this.terminateToken);
+                    this.terminateToken = null;
+                }
+            }
+        }
+    }
+
+    // This constructor signature must exist, it will be called by Akka
+    public TraverserMailbox(final ActorSystem.Settings settings, final Config 
config) {
+        // put your initialization code here
+    }
+
+    // The create method is called to create the MessageQueue
+    public MessageQueue create(final Option<ActorRef> owner, final 
Option<ActorSystem> system) {
+        return new TraverserMessageQueue(owner.isEmpty() ? ActorRef.noSender() 
: owner.get());
+    }
+
+    public static interface TraverserSetSemantics {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git 
a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java
 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java
new file mode 100644
index 0000000..63eb707
--- /dev/null
+++ 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java
@@ -0,0 +1,192 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actor;
+
+import akka.actor.AbstractActor;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.dispatch.RequiresMessageQueue;
+import akka.japi.pf.ReceiveBuilder;
+import 
org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierAddMessage;
+import 
org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierDoneMessage;
+import 
org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.akka.process.actor.message.StartMessage;
+import 
org.apache.tinkerpop.gremlin.akka.process.actor.message.VoteToHaltMessage;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class WorkerTraversalActor extends AbstractActor implements 
RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics> {
+
+    // terminate token is passed around worker ring to gather termination 
consensus (dual-ring termination algorithm)
+    public enum Terminate {
+        MAYBE, YES, NO
+    }
+
+    private final TraversalMatrix<?, ?> matrix;
+    private final Partition localPartition;
+    private final Partitioner partitioner;
+    //
+    private final Map<String, ActorSelection> workers = new HashMap<>();
+    private final String neighborWorker;
+    private boolean isLeader;
+    private Terminate terminate = null;
+    private boolean voteToHalt = false;
+    private Map<String, Barrier> barriers = new HashMap<>();
+
+    public WorkerTraversalActor(final Traversal.Admin<?, ?> traversal, final 
Partition localPartition, final Partitioner partitioner) {
+        System.out.println("worker[created]: " + self().path());
+        // set up partition and traversal information
+        this.localPartition = localPartition;
+        this.partitioner = partitioner;
+        final WorkerTraversalSideEffects sideEffects = new 
WorkerTraversalSideEffects(traversal.getSideEffects(), context());
+        TraversalHelper.applyTraversalRecursively(t -> 
t.setSideEffects(sideEffects), traversal);
+        this.matrix = new TraversalMatrix<>(traversal);
+        final GraphStep graphStep = (GraphStep) traversal.getStartStep();
+        if (0 == graphStep.getIds().length)
+            ((GraphStep) 
traversal.getStartStep()).setIteratorSupplier(graphStep.returnsVertex() ? 
this.localPartition::vertices : this.localPartition::edges);
+        else {
+            if (graphStep.returnsVertex())
+                ((GraphStep<Vertex, Vertex>) 
traversal.getStartStep()).setIteratorSupplier(
+                        () -> 
IteratorUtils.filter(this.localPartition.vertices(graphStep.getIds()), 
this.localPartition::contains));
+            else
+                ((GraphStep<Edge, Edge>) 
traversal.getStartStep()).setIteratorSupplier(
+                        () -> 
IteratorUtils.filter(this.localPartition.edges(graphStep.getIds()), 
this.localPartition::contains));
+        }
+        // create termination ring topology
+        final int i = 
this.partitioner.getPartitions().indexOf(this.localPartition);
+        this.neighborWorker = "../worker-" + 
this.partitioner.getPartitions().get(i == 
this.partitioner.getPartitions().size() - 1 ? 0 : i + 1).hashCode();
+        this.isLeader = i == 0;
+
+        receive(ReceiveBuilder.
+                match(StartMessage.class, start -> {
+                    // initial message from master that says: "start 
processing"
+                    final GraphStep step = (GraphStep) 
this.matrix.getTraversal().getStartStep();
+                    while (step.hasNext()) {
+                        this.sendTraverser(step.next());
+                    }
+                    // internal vote to have in mailbox as final message to 
process
+                    assert null == this.terminate;
+                    if (this.isLeader) {
+                        this.terminate = Terminate.MAYBE;
+                        self().tell(VoteToHaltMessage.instance(), self());
+                    }
+                }).
+                match(Traverser.Admin.class, traverser -> {
+                    this.processTraverser(traverser);
+                }).
+                match(SideEffectAddMessage.class, sideEffect -> {
+                    
this.matrix.getTraversal().getSideEffects().set(sideEffect.getSideEffectKey(), 
sideEffect.getSideEffectValue());
+                }).
+                match(Terminate.class, terminate -> {
+                    assert this.isLeader || this.terminate != Terminate.MAYBE;
+                    this.terminate = terminate;
+                    self().tell(VoteToHaltMessage.instance(), self());
+                }).
+                match(BarrierDoneMessage.class, barrierDone -> {
+                    final Step<?, ?> step = this.matrix.<Object, Object, 
Step<Object, Object>>getStepById(barrierDone.getStepId());
+                    while (step.hasNext()) {
+                        sendTraverser(step.next());
+                    }
+                }).
+                match(VoteToHaltMessage.class, haltSync -> {
+                    // if there is a barrier and thus, halting at barrier, 
then process barrier
+                    if (!this.barriers.isEmpty()) {
+                        for (final Barrier barrier : this.barriers.values()) {
+                            while (barrier.hasNextBarrier()) {
+                                master().tell(new BarrierAddMessage(barrier), 
self());
+                            }
+                        }
+                        this.barriers.clear();
+                        this.voteToHalt = false;
+                    }
+                    // use termination token to determine termination condition
+                    if (null != this.terminate) {
+                        if (this.isLeader) {
+                            if (this.voteToHalt && Terminate.YES == 
this.terminate)
+                                master().tell(VoteToHaltMessage.instance(), 
self());
+                            else
+                                
worker(this.neighborWorker).tell(Terminate.YES, self());
+                        } else
+                            worker(this.neighborWorker).tell(this.voteToHalt ? 
this.terminate : Terminate.NO, self());
+                        this.terminate = null;
+                        this.voteToHalt = true;
+                    }
+                }).build()
+        );
+    }
+
+    private void processTraverser(final Traverser.Admin traverser) {
+        assert !(traverser.get() instanceof Element) || !traverser.isHalted() 
|| this.localPartition.contains((Element) traverser.get());
+        final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, 
Object>>getStepById(traverser.getStepId());
+        if (step instanceof Bypassing) ((Bypassing) step).setBypass(true);
+        GraphComputing.atMaster(step, false);
+        step.addStart(traverser);
+        if (step instanceof Barrier) {
+            this.barriers.put(step.getId(), (Barrier) step);
+        } else {
+            while (step.hasNext()) {
+                this.sendTraverser(step.next());
+            }
+        }
+    }
+
+    private void sendTraverser(final Traverser.Admin traverser) {
+        this.voteToHalt = false;
+        if (traverser.isHalted())
+            master().tell(traverser, self());
+        else if (traverser.get() instanceof Element && 
!this.localPartition.contains((Element) traverser.get()))
+            worker("../worker-" + this.partitioner.getPartition((Element) 
traverser.get()).hashCode()).tell(traverser, self());
+        else
+            self().tell(traverser, self());
+    }
+
+    private ActorSelection worker(final String workerPath) {
+        ActorSelection worker = this.workers.get(workerPath);
+        if (null == worker) {
+            worker = context().actorSelection(workerPath);
+            this.workers.put(workerPath, worker);
+        }
+        return worker;
+    }
+
+    private ActorRef master() {
+        return context().parent();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalSideEffects.java
----------------------------------------------------------------------
diff --git 
a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalSideEffects.java
 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalSideEffects.java
new file mode 100644
index 0000000..9c03298
--- /dev/null
+++ 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalSideEffects.java
@@ -0,0 +1,147 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actor;
+
+import akka.actor.ActorContext;
+import 
org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
+
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BinaryOperator;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class WorkerTraversalSideEffects implements TraversalSideEffects {
+
+    private TraversalSideEffects sideEffects;
+    private ActorContext context;
+
+
+    private WorkerTraversalSideEffects() {
+        // for serialization
+    }
+
+    public WorkerTraversalSideEffects(final TraversalSideEffects sideEffects, 
final ActorContext context) {
+        this.sideEffects = sideEffects;
+        this.context = context;
+    }
+
+    public TraversalSideEffects getSideEffects() {
+        return this.sideEffects;
+    }
+
+    @Override
+    public void set(final String key, final Object value) {
+        this.sideEffects.set(key, value);
+    }
+
+    @Override
+    public <V> V get(final String key) throws IllegalArgumentException {
+        return this.sideEffects.get(key);
+    }
+
+    @Override
+    public void remove(final String key) {
+        this.sideEffects.remove(key);
+    }
+
+    @Override
+    public Set<String> keys() {
+        return this.sideEffects.keys();
+    }
+
+    @Override
+    public void add(final String key, final Object value) {
+        this.context.parent().tell(new SideEffectAddMessage(key, value), 
this.context.self());
+    }
+
+    @Override
+    public <V> void register(final String key, final Supplier<V> initialValue, 
final BinaryOperator<V> reducer) {
+        this.sideEffects.register(key, initialValue, reducer);
+    }
+
+    @Override
+    public <V> void registerIfAbsent(final String key, final Supplier<V> 
initialValue, final BinaryOperator<V> reducer) {
+        this.sideEffects.registerIfAbsent(key, initialValue, reducer);
+    }
+
+    @Override
+    public <V> BinaryOperator<V> getReducer(final String key) {
+        return this.sideEffects.getReducer(key);
+    }
+
+    @Override
+    public <V> Supplier<V> getSupplier(final String key) {
+        return this.sideEffects.getSupplier(key);
+    }
+
+    @Override
+    @Deprecated
+    public void registerSupplier(final String key, final Supplier supplier) {
+        this.sideEffects.registerSupplier(key, supplier);
+    }
+
+    @Override
+    @Deprecated
+    public <V> Optional<Supplier<V>> getRegisteredSupplier(final String key) {
+        return this.sideEffects.getRegisteredSupplier(key);
+    }
+
+    @Override
+    public <S> void setSack(final Supplier<S> initialValue, final 
UnaryOperator<S> splitOperator, final BinaryOperator<S> mergeOperator) {
+        this.sideEffects.setSack(initialValue, splitOperator, mergeOperator);
+    }
+
+    @Override
+    public <S> Supplier<S> getSackInitialValue() {
+        return this.sideEffects.getSackInitialValue();
+    }
+
+    @Override
+    public <S> UnaryOperator<S> getSackSplitter() {
+        return this.sideEffects.getSackSplitter();
+    }
+
+    @Override
+    public <S> BinaryOperator<S> getSackMerger() {
+        return this.sideEffects.getSackMerger();
+    }
+
+    @Override
+    public TraversalSideEffects clone() {
+        try {
+            final WorkerTraversalSideEffects clone = 
(WorkerTraversalSideEffects) super.clone();
+            clone.sideEffects = this.sideEffects.clone();
+            return clone;
+        } catch (final CloneNotSupportedException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void mergeInto(final TraversalSideEffects sideEffects) {
+        this.sideEffects.mergeInto(sideEffects);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierAddMessage.java
----------------------------------------------------------------------
diff --git 
a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierAddMessage.java
 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierAddMessage.java
new file mode 100644
index 0000000..4a351c1
--- /dev/null
+++ 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierAddMessage.java
@@ -0,0 +1,47 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actor.message;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class BarrierAddMessage {
+
+    private final Object barrier;
+    private final String stepId;
+
+    public BarrierAddMessage(final Barrier barrier) {
+        this.barrier = barrier.nextBarrier();
+        this.stepId = ((Step) barrier).getId();
+    }
+
+    public Object getBarrier() {
+        return this.barrier;
+    }
+
+    public String getStepId() {
+        return this.stepId;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierDoneMessage.java
----------------------------------------------------------------------
diff --git 
a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierDoneMessage.java
 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierDoneMessage.java
new file mode 100644
index 0000000..208b346
--- /dev/null
+++ 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierDoneMessage.java
@@ -0,0 +1,41 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actor.message;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class BarrierDoneMessage {
+
+    private final String stepId;
+
+    public BarrierDoneMessage(final Barrier barrier) {
+        this.stepId = ((Step) barrier).getId();
+
+    }
+
+    public String getStepId() {
+        return this.stepId;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java
----------------------------------------------------------------------
diff --git 
a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java
 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java
new file mode 100644
index 0000000..4a54d97
--- /dev/null
+++ 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java
@@ -0,0 +1,43 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actor.message;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SideEffectAddMessage {
+
+    private final String sideEffectKey;
+    private final Object sideEffect;
+
+    public SideEffectAddMessage(final String sideEffectKey, final Object 
sideEffect) {
+        this.sideEffect = sideEffect;
+        this.sideEffectKey = sideEffectKey;
+    }
+
+    public String getSideEffectKey() {
+        return this.sideEffectKey;
+    }
+
+    public Object getSideEffectValue() {
+        return this.sideEffect;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/StartMessage.java
----------------------------------------------------------------------
diff --git 
a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/StartMessage.java
 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/StartMessage.java
new file mode 100644
index 0000000..ebc469c
--- /dev/null
+++ 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/StartMessage.java
@@ -0,0 +1,35 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actor.message;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class StartMessage {
+
+    private static final StartMessage INSTANCE = new StartMessage();
+
+    private StartMessage() {
+    }
+
+    public static StartMessage instance() {
+        return INSTANCE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/VoteToHaltMessage.java
----------------------------------------------------------------------
diff --git 
a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/VoteToHaltMessage.java
 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/VoteToHaltMessage.java
new file mode 100644
index 0000000..8bfa4c9
--- /dev/null
+++ 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/VoteToHaltMessage.java
@@ -0,0 +1,36 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actor.message;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class VoteToHaltMessage {
+
+    private static final VoteToHaltMessage INSTANCE = new VoteToHaltMessage();
+
+    private VoteToHaltMessage() {
+    }
+
+    public static VoteToHaltMessage instance() {
+        return INSTANCE;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/step/map/ActorStep.java
----------------------------------------------------------------------
diff --git 
a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/step/map/ActorStep.java
 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/step/map/ActorStep.java
new file mode 100644
index 0000000..db41493
--- /dev/null
+++ 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/step/map/ActorStep.java
@@ -0,0 +1,76 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.traversal.step.map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+
+import org.apache.tinkerpop.gremlin.akka.process.actor.AkkaActors;
+import 
org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.decoration.ActorStrategy;
+import 
org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+
+import java.util.NoSuchElementException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ActorStep<S, E> extends AbstractStep<E, E> {
+
+    public final Traversal.Admin<S, E> partitionTraversal;
+    private final Partitioner partitioner;
+    private boolean first = true;
+
+    public ActorStep(final Traversal.Admin<?, ?> traversal, final Partitioner 
partitioner) {
+        super(traversal);
+        this.partitionTraversal = (Traversal.Admin) traversal.clone();
+        final TraversalStrategies strategies = 
this.partitionTraversal.getStrategies().clone();
+        strategies.removeStrategies(ActorStrategy.class);
+        strategies.addStrategies(VertexProgramStrategy.instance());
+        this.partitionTraversal.setStrategies(strategies);
+        this.partitioner = partitioner;
+    }
+
+    @Override
+    public String toString() {
+        return StringFactory.stepString(this, this.partitionTraversal);
+    }
+
+    @Override
+    protected Traverser.Admin<E> processNextStart() throws 
NoSuchElementException {
+        if (this.first) {
+            this.first = false;
+            final AkkaActors<S, E> actors = new 
AkkaActors<>(this.partitionTraversal, this.partitioner);
+            try {
+                actors.getResults().get().forEach(this.starts::add);
+            } catch (final Exception e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+        return this.starts.next();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/decoration/ActorStrategy.java
----------------------------------------------------------------------
diff --git 
a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/decoration/ActorStrategy.java
 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/decoration/ActorStrategy.java
new file mode 100644
index 0000000..adbc257
--- /dev/null
+++ 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/decoration/ActorStrategy.java
@@ -0,0 +1,83 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package 
org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.decoration;
+
+import org.apache.tinkerpop.gremlin.akka.process.traversal.step.map.ActorStep;
+import 
org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy;
+import 
org.apache.tinkerpop.gremlin.process.remote.traversal.strategy.decoration.RemoteStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ActorStrategy extends 
AbstractTraversalStrategy<TraversalStrategy.DecorationStrategy>
+        implements TraversalStrategy.DecorationStrategy {
+
+
+    private static final Set<Class<? extends DecorationStrategy>> PRIORS = 
Collections.singleton(RemoteStrategy.class);
+    private static final Set<Class<? extends DecorationStrategy>> POSTS = 
Collections.singleton(VertexProgramStrategy.class);
+
+    private final Partitioner partitioner;
+
+    public ActorStrategy(final Partitioner partitioner) {
+        this.partitioner = partitioner;
+    }
+
+    @Override
+    public void apply(final Traversal.Admin<?, ?> traversal) {
+        ReadOnlyStrategy.instance().apply(traversal);
+        if (!TraversalHelper.getStepsOfAssignableClass(InjectStep.class, 
traversal).isEmpty())
+            throw new VerificationException("Inject traversal currently not 
supported", traversal);
+
+        if (!(traversal.getParent() instanceof EmptyStep))
+            return;
+
+        final ActorStep<?, ?> actorStep = new ActorStep<>(traversal, 
this.partitioner);
+        TraversalHelper.removeAllSteps(traversal);
+        traversal.addStep(actorStep);
+
+        // validations
+        assert traversal.getStartStep().equals(actorStep);
+        assert traversal.getSteps().size() == 1;
+        assert traversal.getEndStep() == actorStep;
+    }
+
+    @Override
+    public Set<Class<? extends DecorationStrategy>> applyPost() {
+        return POSTS;
+    }
+
+    @Override
+    public Set<Class<? extends DecorationStrategy>> applyPrior() {
+        return PRIORS;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/verification/ActorVerificationStrategy.java
----------------------------------------------------------------------
diff --git 
a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/verification/ActorVerificationStrategy.java
 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/verification/ActorVerificationStrategy.java
new file mode 100644
index 0000000..a9e3f7b
--- /dev/null
+++ 
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/verification/ActorVerificationStrategy.java
@@ -0,0 +1,60 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package 
org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.verification;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ActorVerificationStrategy extends 
AbstractTraversalStrategy<TraversalStrategy.VerificationStrategy> implements 
TraversalStrategy.VerificationStrategy {
+
+    private static final ActorVerificationStrategy INSTANCE = new 
ActorVerificationStrategy();
+
+    private ActorVerificationStrategy() {
+    }
+
+    @Override
+    public void apply(final Traversal.Admin<?, ?> traversal) {
+        if (!TraversalHelper.onGraphComputer(traversal))
+            return;
+        final boolean globalChild = TraversalHelper.isGlobalChild(traversal);
+        for (final Step<?, ?> step : traversal.getSteps()) {
+            // only global children are graph computing
+            if (globalChild && step instanceof GraphComputing)
+                ((GraphComputing) step).onGraphComputer();
+
+            for (String label : step.getLabels()) {
+                if (Graph.Hidden.isHidden(label))
+                    step.removeLabel(label);
+            }
+        }
+    }
+
+    public static ActorVerificationStrategy instance() {
+        return INSTANCE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/resources/application.conf 
b/akka-gremlin/src/main/resources/application.conf
new file mode 100644
index 0000000..706e3a6
--- /dev/null
+++ b/akka-gremlin/src/main/resources/application.conf
@@ -0,0 +1,11 @@
+custom-dispatcher {
+  mailbox-requirement = 
"org.apache.tinkerpop.gremlin.akka.process.actor.TraverserMailbox$TraverserSetSemantics"
+}
+
+akka.actor.mailbox.requirements {
+  
"org.apache.tinkerpop.gremlin.akka.process.actor.TraverserMailbox$TraverserSetSemantics"
 = custom-dispatcher-mailbox
+}
+
+custom-dispatcher-mailbox {
+  mailbox-type = 
"org.apache.tinkerpop.gremlin.akka.process.actor.TraverserMailbox"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProcessStandardTest.java
----------------------------------------------------------------------
diff --git 
a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProcessStandardTest.java
 
b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProcessStandardTest.java
new file mode 100644
index 0000000..2e84bd9
--- /dev/null
+++ 
b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProcessStandardTest.java
@@ -0,0 +1,33 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process;
+
+import org.apache.tinkerpop.gremlin.GraphProviderClass;
+import org.apache.tinkerpop.gremlin.process.ProcessStandardSuite;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@RunWith(ProcessStandardSuite.class)
+@GraphProviderClass(provider = AkkaActorsProvider.class, graph = 
TinkerGraph.class)
+public class AkkaActorsProcessStandardTest {
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java
----------------------------------------------------------------------
diff --git 
a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java
 
b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java
new file mode 100644
index 0000000..9158040
--- /dev/null
+++ 
b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java
@@ -0,0 +1,140 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.AbstractGraphProvider;
+import org.apache.tinkerpop.gremlin.LoadGraphWith;
+import 
org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.decoration.ActorStrategy;
+import 
org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest;
+import 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.ProgramTest;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectTest;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategyProcessTest;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.PartitionStrategyProcessTest;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.SubgraphStrategyProcessTest;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+import org.apache.tinkerpop.gremlin.structure.util.partitioner.HashPartitioner;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerEdge;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerElement;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraphVariables;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerProperty;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertex;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertexProperty;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class AkkaActorsProvider extends AbstractGraphProvider {
+
+    protected static final boolean IMPORT_STATICS = new Random().nextBoolean();
+
+    private static Set<String> SKIP_TESTS = new HashSet<>(Arrays.asList(
+            
"g_V_outXfollowedByX_group_byXsongTypeX_byXbothE_group_byXlabelX_byXweight_sumXX",
+            SideEffectTest.Traversals.class.getCanonicalName(),
+            SubgraphStrategyProcessTest.class.getCanonicalName(),
+            ProfileTest.Traversals.class.getCanonicalName(),
+            PartitionStrategyProcessTest.class.getCanonicalName(),
+            EventStrategyProcessTest.class.getCanonicalName(),
+            ElementIdStrategyProcessTest.class.getCanonicalName(),
+            TraversalInterruptionTest.class.getCanonicalName(),
+            ProgramTest.Traversals.class.getCanonicalName()));
+
+    private static final Set<Class> IMPLEMENTATION = new HashSet<Class>() {{
+        add(TinkerEdge.class);
+        add(TinkerElement.class);
+        add(TinkerGraph.class);
+        add(TinkerGraphVariables.class);
+        add(TinkerProperty.class);
+        add(TinkerVertex.class);
+        add(TinkerVertexProperty.class);
+    }};
+
+    @Override
+    public Map<String, Object> getBaseConfiguration(final String graphName, 
final Class<?> test, final String testMethodName,
+                                                    final 
LoadGraphWith.GraphData loadGraphWith) {
+
+        final TinkerGraph.DefaultIdManager idManager = 
selectIdMakerFromGraphData(loadGraphWith);
+        final String idMaker = 
(idManager.equals(TinkerGraph.DefaultIdManager.ANY) ? 
selectIdMakerFromGraphData(loadGraphWith) : idManager).name();
+        return new HashMap<String, Object>() {{
+            put(Graph.GRAPH, TinkerGraph.class.getName());
+            put(TinkerGraph.GREMLIN_TINKERGRAPH_VERTEX_ID_MANAGER, idMaker);
+            put(TinkerGraph.GREMLIN_TINKERGRAPH_EDGE_ID_MANAGER, idMaker);
+            put(TinkerGraph.GREMLIN_TINKERGRAPH_VERTEX_PROPERTY_ID_MANAGER, 
idMaker);
+            put("skipTest", SKIP_TESTS.contains(testMethodName) || 
SKIP_TESTS.contains(test.getCanonicalName()));
+            if (loadGraphWith == LoadGraphWith.GraphData.CREW)
+                
put(TinkerGraph.GREMLIN_TINKERGRAPH_DEFAULT_VERTEX_PROPERTY_CARDINALITY, 
VertexProperty.Cardinality.list.name());
+        }};
+    }
+
+    @Override
+    public void clear(final Graph graph, final Configuration configuration) 
throws Exception {
+        if (graph != null) graph.close();
+    }
+
+    @Override
+    public Set<Class> getImplementations() {
+        return IMPLEMENTATION;
+    }
+
+    /**
+     * Test that load with specific graph data can be configured with a 
specific id manager as the data type to
+     * be used in the test for that graph is known.
+     */
+    protected TinkerGraph.DefaultIdManager selectIdMakerFromGraphData(final 
LoadGraphWith.GraphData loadGraphWith) {
+        if (null == loadGraphWith) return TinkerGraph.DefaultIdManager.ANY;
+        if (loadGraphWith.equals(LoadGraphWith.GraphData.CLASSIC))
+            return TinkerGraph.DefaultIdManager.INTEGER;
+        else if (loadGraphWith.equals(LoadGraphWith.GraphData.MODERN))
+            return TinkerGraph.DefaultIdManager.INTEGER;
+        else if (loadGraphWith.equals(LoadGraphWith.GraphData.CREW))
+            return TinkerGraph.DefaultIdManager.INTEGER;
+        else if (loadGraphWith.equals(LoadGraphWith.GraphData.GRATEFUL))
+            return TinkerGraph.DefaultIdManager.INTEGER;
+        else
+            throw new IllegalStateException(String.format("Need to define a 
new %s for %s", TinkerGraph.IdManager.class.getName(), loadGraphWith.name()));
+    }
+
+/////////////////////////////
+/////////////////////////////
+/////////////////////////////
+
+    @Override
+    public GraphTraversalSource traversal(final Graph graph) {
+        if ((Boolean) graph.configuration().getProperty("skipTest"))
+            return graph.traversal();
+            //throw new VerificationException("This test current does not work 
with Gremlin-Python", EmptyTraversal.instance());
+        else {
+            final GraphTraversalSource g = graph.traversal();
+            return g.withStrategies(new ActorStrategy(new 
HashPartitioner(graph.partitioner(), 3)));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
----------------------------------------------------------------------
diff --git 
a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
 
b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
new file mode 100644
index 0000000..b064331
--- /dev/null
+++ 
b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
@@ -0,0 +1,82 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process;
+
+import 
org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.decoration.ActorStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
+import org.apache.tinkerpop.gremlin.structure.util.partitioner.HashPartitioner;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.junit.Test;
+
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.inE;
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.outE;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class AkkaPlayTest {
+
+    @Test
+    public void testPlay1() throws Exception {
+        final Graph graph = TinkerGraph.open();
+        graph.io(GryoIo.build()).readGraph("../data/tinkerpop-modern.kryo");
+        GraphTraversalSource g = graph.traversal().withStrategies(new 
ActorStrategy(new HashPartitioner(graph.partitioner(), 3)));
+        System.out.println(g.V(1, 2).union(outE().count(), inE().count(), 
(Traversal) outE().values("weight").sum()).toList());
+        //3, 1.9, 1
+        /*for (int i = 0; i < 10000; i++) {
+            final Graph graph = TinkerGraph.open();
+            graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo");
+            final GraphTraversalSource g = graph.traversal().withComputer();
+            final List<Pair<Integer, Traversal.Admin<?, ?>>> traversals = 
Arrays.asList(
+                    // match() works
+                    Pair.with(6, g.V().match(
+                            as("a").out("created").as("b"),
+                            as("b").in("created").as("c"),
+                            as("b").has("name", P.eq("lop"))).where("a", 
P.neq("c")).select("a", "b", "c").by("name").asAdmin()),
+                    // side-effects work
+                    Pair.with(3, g.V().repeat(both()).times(2).
+                            groupCount("a").by("name").
+                            cap("a").unfold().order().by(Column.values, 
Order.decr).limit(3).asAdmin()),
+                    // barriers work and beyond the local star graph works
+                    Pair.with(1, 
g.V().repeat(both()).times(2).hasLabel("person").
+                            group().
+                            by("name").
+                            
by(out("created").values("name").dedup().fold()).asAdmin()),
+                    // no results works
+                    Pair.with(0, g.V().out("blah").asAdmin())
+            );
+            for (final Pair<Integer,Traversal.Admin<?, ?>> pair : traversals) {
+                final Integer count = pair.getValue0();
+                final Traversal.Admin<?,?> traversal = pair.getValue1();
+                System.out.println("EXECUTING: " + traversal.getBytecode());
+                final TinkerActorSystem<?,?> actors = new 
TinkerActorSystem<>(traversal.clone(),new HashPartitioner(graph.partitioner(), 
3));
+                
System.out.println(IteratorUtils.asList(actors.getResults().get()));
+                if(IteratorUtils.count(actors.getResults().get()) != count)
+                    throw new IllegalStateException();
+                System.out.println("//////////////////////////////////\n");
+            }
+        }
+    }*/
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java
index f9c85a2..e4e2cb5 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java
@@ -28,6 +28,7 @@ import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSe
 import 
org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.util.Collections;
 import java.util.NoSuchElementException;
@@ -96,8 +97,10 @@ public abstract class CollectingBarrierStep<S> extends 
AbstractStep<S, S> implem
 
     @Override
     public void addBarrier(final TraverserSet<S> barrier) {
-        this.traverserSet = barrier;
-        this.traverserSet.forEach(traverser -> 
traverser.setSideEffects(this.getTraversal().getSideEffects()));
+        
IteratorUtils.removeOnNext(barrier.iterator()).forEachRemaining(traverser -> {
+            traverser.setSideEffects(this.getTraversal().getSideEffects());
+            this.starts.add(traverser);
+        });
         this.barrierConsumer(this.traverserSet);
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Graph.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Graph.java 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Graph.java
index 2db37d3..61b17a3 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Graph.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Graph.java
@@ -30,6 +30,7 @@ import org.apache.tinkerpop.gremlin.structure.io.IoRegistry;
 import org.apache.tinkerpop.gremlin.structure.util.FeatureDescriptor;
 import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
 import org.apache.tinkerpop.gremlin.structure.util.Host;
+import 
org.apache.tinkerpop.gremlin.structure.util.partitioner.GlobalPartitioner;
 import org.javatuples.Pair;
 
 import java.lang.annotation.ElementType;
@@ -331,11 +332,20 @@ public interface Graph extends AutoCloseable, Host {
      * Whatever configuration was passed to {@link 
GraphFactory#open(org.apache.commons.configuration.Configuration)}
      * is what should be returned by this method.
      *
-     * @return the configuration used during graph construction.
+     * @return the configuration used during graph construction
      */
     public Configuration configuration();
 
     /**
+     * Get the {@link Partitioner} describing how the graph's elements are 
partitioned across a cluster.
+     *
+     * @return the partitioner of the graph
+     */
+    public default Partitioner partitioner() {
+        return new GlobalPartitioner(this);
+    }
+
+    /**
      * Graph variables are a set of key/value pairs associated with the graph. 
The keys are String and the values
      * are Objects.
      */

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java
new file mode 100644
index 0000000..12faca9
--- /dev/null
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java
@@ -0,0 +1,73 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.structure;
+
+import java.net.URI;
+import java.util.Iterator;
+
+/**
+ * A {@code Partition} represents a physical or logical split of the 
underlying {@link Graph} structure.
+ * In distributed graph systems, a physical partition denotes which 
vertices/edges are in the subgraph of the underyling
+ * physical machine. In a logical partition, a physical partition may be split 
amongst multiple threads and thus,
+ * while isolated logically, they are united physically.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface Partition {
+
+    /**
+     * Whether or not this element was, is, or will be contained in this 
partition.
+     * Containment is not whether the element currently exists, but instead 
whether if it did exist, would it be
+     * contained in this partition.
+     *
+     * @param element the element to check for containment
+     * @return whether the element would be contained in this partition
+     */
+    public boolean contains(final Element element);
+
+    /**
+     * The current existing vertices contained in this partition.
+     *
+     * @param ids filtering to only those ids provided
+     * @return an iterator of vertices contained in the partition
+     */
+    public Iterator<Vertex> vertices(final Object... ids);
+
+    /**
+     * The current existing edges contained in this partition.
+     *
+     * @param ids filtering to only those ids provided
+     * @return an iterator of edges contained in the partition
+     */
+    public Iterator<Edge> edges(final Object... ids);
+
+    /**
+     * Get the {@link URI} location of the partition.
+     *
+     * @return the location of the partition
+     */
+    public URI location();
+
+    public static interface PhysicalPartition extends Partition {
+    }
+
+    public static interface LogicalPartition extends Partition {
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partitioner.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partitioner.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partitioner.java
new file mode 100644
index 0000000..1d4aae1
--- /dev/null
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partitioner.java
@@ -0,0 +1,33 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.structure;
+
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface Partitioner {
+
+    public List<Partition> getPartitions();
+
+    public Partition getPartition(final Element element);
+
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
new file mode 100644
index 0000000..910de8e
--- /dev/null
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
@@ -0,0 +1,84 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.structure.util.partitioner;
+
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import java.net.URI;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GlobalPartitioner implements Partitioner {
+
+    private final GlobalPartition partition;
+
+    public GlobalPartitioner(final Graph graph) {
+        this.partition = new GlobalPartition(graph);
+    }
+
+    @Override
+    public List<Partition> getPartitions() {
+        return Collections.singletonList(this.partition);
+    }
+
+    @Override
+    public Partition getPartition(final Element element) {
+        return this.partition;
+    }
+
+    private class GlobalPartition implements Partition {
+
+        private final Graph graph;
+
+        private GlobalPartition(final Graph graph) {
+            this.graph = graph;
+        }
+
+        @Override
+        public boolean contains(final Element element) {
+            return true;
+        }
+
+        @Override
+        public Iterator<Vertex> vertices(final Object... ids) {
+            return this.graph.vertices(ids);
+        }
+
+        @Override
+        public Iterator<Edge> edges(final Object... ids) {
+            return this.graph.edges(ids);
+        }
+
+        @Override
+        public URI location() {
+            return URI.create("localhost");
+        }
+    }
+}
+

Reply via email to