created a new module called akka-gremlin. It will implement the actor/ package interfaces (yet to be defined in gremlin-core).
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/93d58caf Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/93d58caf Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/93d58caf Branch: refs/heads/TINKERPOP-1564 Commit: 93d58caff76dea3fb1f668ff4e50f1b4ce2e904b Parents: d4dcae2 Author: Marko A. Rodriguez <[email protected]> Authored: Tue Dec 13 05:08:56 2016 -0700 Committer: Marko A. Rodriguez <[email protected]> Committed: Wed Jan 4 05:07:59 2017 -0700 ---------------------------------------------------------------------- akka-gremlin/pom.xml | 82 ++++++++ .../gremlin/akka/process/actor/AkkaActors.java | 53 +++++ .../process/actor/MasterTraversalActor.java | 176 +++++++++++++++++ .../akka/process/actor/TraverserMailbox.java | 132 +++++++++++++ .../process/actor/WorkerTraversalActor.java | 192 +++++++++++++++++++ .../actor/WorkerTraversalSideEffects.java | 147 ++++++++++++++ .../actor/message/BarrierAddMessage.java | 47 +++++ .../actor/message/BarrierDoneMessage.java | 41 ++++ .../actor/message/SideEffectAddMessage.java | 43 +++++ .../process/actor/message/StartMessage.java | 35 ++++ .../actor/message/VoteToHaltMessage.java | 36 ++++ .../process/traversal/step/map/ActorStep.java | 76 ++++++++ .../strategy/decoration/ActorStrategy.java | 83 ++++++++ .../verification/ActorVerificationStrategy.java | 60 ++++++ .../src/main/resources/application.conf | 11 ++ .../process/AkkaActorsProcessStandardTest.java | 33 ++++ .../akka/process/AkkaActorsProvider.java | 140 ++++++++++++++ .../gremlin/akka/process/AkkaPlayTest.java | 82 ++++++++ .../step/util/CollectingBarrierStep.java | 7 +- .../tinkerpop/gremlin/structure/Graph.java | 12 +- .../tinkerpop/gremlin/structure/Partition.java | 73 +++++++ .../gremlin/structure/Partitioner.java | 33 ++++ .../util/partitioner/GlobalPartitioner.java | 84 ++++++++ .../util/partitioner/HashPartitioner.java | 96 ++++++++++ pom.xml | 2 + 25 files changed, 1773 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/pom.xml ---------------------------------------------------------------------- diff --git a/akka-gremlin/pom.xml b/akka-gremlin/pom.xml new file mode 100644 index 0000000..e9f5345 --- /dev/null +++ b/akka-gremlin/pom.xml @@ -0,0 +1,82 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>tinkerpop</artifactId> + <groupId>org.apache.tinkerpop</groupId> + <version>3.3.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>akka-gremlin</artifactId> + <name>Apache TinkerPop :: Akka Gremlin</name> + <dependencies> + <dependency> + <groupId>org.apache.tinkerpop</groupId> + <artifactId>gremlin-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-actor_2.11</artifactId> + <version>2.4.14</version> + <exclusions> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>2.11.8</version> + </dependency> + <!-- TEST --> + <dependency> + <groupId>org.apache.tinkerpop</groupId> + <artifactId>gremlin-test</artifactId> + <version>3.3.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.tinkerpop</groupId> + <artifactId>tinkergraph-gremlin</artifactId> + <version>3.3.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <directory>${basedir}/target</directory> + <finalName>${project.artifactId}-${project.version}</finalName> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java new file mode 100644 index 0000000..aa2a048 --- /dev/null +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.actor; + +import akka.actor.ActorSystem; +import akka.actor.Props; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; +import org.apache.tinkerpop.gremlin.structure.Partitioner; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class AkkaActors<S, E> { + + public final ActorSystem system; + private TraverserSet<E> results = new TraverserSet<>(); + + public AkkaActors(final Traversal.Admin<S, E> traversal, final Partitioner partitioner) { + this.system = ActorSystem.create("traversal-" + traversal.hashCode()); + this.system.actorOf(Props.create(MasterTraversalActor.class, traversal.clone(), partitioner, this.results), "master"); + } + + public Future<TraverserSet<E>> getResults() { + return CompletableFuture.supplyAsync(() -> { + while (!this.system.isTerminated()) { + + } + return this.results; + }); + } +} + http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java new file mode 100644 index 0000000..d1a29d8 --- /dev/null +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.actor; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.Props; +import akka.dispatch.RequiresMessageQueue; +import akka.japi.pf.ReceiveBuilder; +import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierAddMessage; +import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierDoneMessage; +import org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectAddMessage; +import org.apache.tinkerpop.gremlin.akka.process.actor.message.StartMessage; +import org.apache.tinkerpop.gremlin.akka.process.actor.message.VoteToHaltMessage; +import org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.verification.ActorVerificationStrategy; +import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep; +import org.apache.tinkerpop.gremlin.process.traversal.Step; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies; +import org.apache.tinkerpop.gremlin.process.traversal.Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; +import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing; +import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ComputerVerificationStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.StandardVerificationStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix; +import org.apache.tinkerpop.gremlin.structure.Element; +import org.apache.tinkerpop.gremlin.structure.Partition; +import org.apache.tinkerpop.gremlin.structure.Partitioner; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class MasterTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics> { + + private final Traversal.Admin<?, ?> traversal; + private final TraversalMatrix<?, ?> matrix; + private final Partitioner partitioner; + private final Map<String, ActorSelection> workers = new HashMap<>(); + private Map<String, Barrier> barriers = new HashMap<>(); + private final TraverserSet<?> results; + private final String leaderWorker; + + public MasterTraversalActor(final Traversal.Admin<?, ?> traversal, final Partitioner partitioner, final TraverserSet<?> results) { + System.out.println("master[created]: " + self().path()); + final TraversalStrategies strategies = traversal.getStrategies().clone(); + strategies.removeStrategies(ComputerVerificationStrategy.class, StandardVerificationStrategy.class); + strategies.addStrategies(ActorVerificationStrategy.instance()); + traversal.setStrategies(strategies); + traversal.applyStrategies(); + + this.traversal = ((TraversalVertexProgramStep) traversal.getStartStep()).computerTraversal.get(); + System.out.println(this.traversal); + this.matrix = new TraversalMatrix<>(this.traversal); + this.partitioner = partitioner; + this.results = results; + this.initializeWorkers(); + this.leaderWorker = "worker-" + this.partitioner.getPartitions().get(0).hashCode(); + + receive(ReceiveBuilder. + match(Traverser.Admin.class, traverser -> { + this.processTraverser(traverser); + }). + match(BarrierAddMessage.class, barrierMerge -> { + // get the barrier updates from the workers to synchronize against the master barrier + final Barrier barrier = (Barrier) this.matrix.getStepById(barrierMerge.getStepId()); + final Step<?, ?> step = (Step) barrier; + GraphComputing.atMaster(step, true); + barrier.addBarrier(barrierMerge.getBarrier()); + this.barriers.put(step.getId(), barrier); + }). + match(SideEffectAddMessage.class, sideEffect -> { + // get the side-effect updates from the workers to generate the master side-effects + this.traversal.getSideEffects().add(sideEffect.getSideEffectKey(), sideEffect.getSideEffectValue()); + }). + match(VoteToHaltMessage.class, voteToHalt -> { + assert !sender().equals(self()); + if (!this.barriers.isEmpty()) { + for (final Barrier barrier : this.barriers.values()) { + final Step<?, ?> step = (Step) barrier; + if (!(barrier instanceof LocalBarrier)) { + while (step.hasNext()) { + this.sendTraverser(step.next()); + } + } else { + this.traversal.getSideEffects().forEach((k, v) -> { + this.broadcast(new SideEffectAddMessage(k, v)); + }); + this.broadcast(new BarrierDoneMessage(barrier)); + barrier.done(); + } + } + this.barriers.clear(); + worker(this.leaderWorker).tell(StartMessage.instance(), self()); + } else { + while (this.traversal.hasNext()) { + this.results.add((Traverser.Admin) this.traversal.nextTraverser()); + } + context().system().terminate(); + } + }).build()); + } + + private void initializeWorkers() { + final List<Partition> partitions = this.partitioner.getPartitions(); + for (final Partition partition : partitions) { + final String workerPathString = "worker-" + partition.hashCode(); + final ActorRef worker = context().actorOf(Props.create(WorkerTraversalActor.class, this.traversal.clone(), partition, this.partitioner), workerPathString); + this.workers.put(workerPathString, context().actorSelection(worker.path())); + } + for (final ActorSelection worker : this.workers.values()) { + worker.tell(StartMessage.instance(), self()); + } + this.workers.clear(); + } + + private void broadcast(final Object message) { + for (final Partition partition : this.partitioner.getPartitions()) { + worker("worker-" + partition.hashCode()).tell(message, self()); + } + } + + private void processTraverser(final Traverser.Admin traverser) { + if (traverser.isHalted() || traverser.get() instanceof Element) { + this.sendTraverser(traverser); + } else { + final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId()); + GraphComputing.atMaster(step, true); + step.addStart(traverser); + while (step.hasNext()) { + this.processTraverser(step.next()); + } + } + } + + private void sendTraverser(final Traverser.Admin traverser) { + if (traverser.isHalted()) + this.results.add(traverser); + else if (traverser.get() instanceof Element) + worker("worker-" + this.partitioner.getPartition((Element) traverser.get()).hashCode()).tell(traverser, self()); + else + self().tell(traverser, self()); + } + + private ActorSelection worker(final String workerPath) { + ActorSelection worker = this.workers.get(workerPath); + if (null == worker) { + worker = context().actorSelection(workerPath); + this.workers.put(workerPath, worker); + } + return worker; + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java new file mode 100644 index 0000000..6a6c0f4 --- /dev/null +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.actor; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.dispatch.Envelope; +import akka.dispatch.MailboxType; +import akka.dispatch.MessageQueue; +import akka.dispatch.ProducesMessageQueue; +import com.typesafe.config.Config; +import org.apache.tinkerpop.gremlin.akka.process.actor.message.VoteToHaltMessage; +import org.apache.tinkerpop.gremlin.process.traversal.Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; +import scala.Option; + +import java.util.LinkedList; +import java.util.Queue; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class TraverserMailbox implements MailboxType, ProducesMessageQueue<TraverserMailbox.TraverserMessageQueue> { + + public static class TraverserMessageQueue implements MessageQueue, TraverserSetSemantics { + private final Queue<Envelope> otherMessages = new LinkedList<>(); + private final TraverserSet<?> traverserMessages = new TraverserSet<>(); + private Envelope haltMessage = null; + private Envelope terminateToken = null; + private final ActorRef owner; + private final Object MUTEX = new Object(); + + public TraverserMessageQueue(final ActorRef owner) { + this.owner = owner; + } + + public void enqueue(final ActorRef receiver, final Envelope handle) { + synchronized (MUTEX) { + if (handle.message() instanceof Traverser.Admin) + this.traverserMessages.offer((Traverser.Admin) handle.message()); + else if (handle.message() instanceof VoteToHaltMessage) { + assert null == this.haltMessage; + this.haltMessage = handle; + } else if (handle.message() instanceof WorkerTraversalActor.Terminate) { + assert null == this.terminateToken; + this.terminateToken = handle; + } else + this.otherMessages.offer(handle); + } + } + + public Envelope dequeue() { + synchronized (MUTEX) { + if (!this.otherMessages.isEmpty()) + return this.otherMessages.poll(); + if (!this.traverserMessages.isEmpty()) + return new Envelope(this.traverserMessages.poll(), this.owner); + else if (null != this.terminateToken) { + final Envelope temp = this.terminateToken; + this.terminateToken = null; + return temp; + } else { + final Envelope temp = this.haltMessage; + this.haltMessage = null; + return temp; + } + } + } + + public int numberOfMessages() { + synchronized (MUTEX) { + return this.otherMessages.size() + this.traverserMessages.size() + (null == this.haltMessage ? 0 : 1) + (null == this.terminateToken ? 0 : 1); + } + } + + public boolean hasMessages() { + synchronized (MUTEX) { + return !this.otherMessages.isEmpty() || !this.traverserMessages.isEmpty() || null != this.haltMessage || this.terminateToken != null; + } + } + + public void cleanUp(final ActorRef owner, final MessageQueue deadLetters) { + synchronized (MUTEX) { + for (final Envelope handle : this.otherMessages) { + deadLetters.enqueue(owner, handle); + } + for (final Traverser.Admin<?> traverser : this.traverserMessages) { + deadLetters.enqueue(owner, new Envelope(traverser, this.owner)); + } + if (null != this.haltMessage) { + deadLetters.enqueue(owner, this.haltMessage); + this.haltMessage = null; + } + if (null != this.terminateToken) { + deadLetters.enqueue(owner, this.terminateToken); + this.terminateToken = null; + } + } + } + } + + // This constructor signature must exist, it will be called by Akka + public TraverserMailbox(final ActorSystem.Settings settings, final Config config) { + // put your initialization code here + } + + // The create method is called to create the MessageQueue + public MessageQueue create(final Option<ActorRef> owner, final Option<ActorSystem> system) { + return new TraverserMessageQueue(owner.isEmpty() ? ActorRef.noSender() : owner.get()); + } + + public static interface TraverserSetSemantics { + + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java new file mode 100644 index 0000000..63eb707 --- /dev/null +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.actor; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.dispatch.RequiresMessageQueue; +import akka.japi.pf.ReceiveBuilder; +import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierAddMessage; +import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierDoneMessage; +import org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectAddMessage; +import org.apache.tinkerpop.gremlin.akka.process.actor.message.StartMessage; +import org.apache.tinkerpop.gremlin.akka.process.actor.message.VoteToHaltMessage; +import org.apache.tinkerpop.gremlin.process.traversal.Step; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; +import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing; +import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix; +import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.Element; +import org.apache.tinkerpop.gremlin.structure.Partition; +import org.apache.tinkerpop.gremlin.structure.Partitioner; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class WorkerTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics> { + + // terminate token is passed around worker ring to gather termination consensus (dual-ring termination algorithm) + public enum Terminate { + MAYBE, YES, NO + } + + private final TraversalMatrix<?, ?> matrix; + private final Partition localPartition; + private final Partitioner partitioner; + // + private final Map<String, ActorSelection> workers = new HashMap<>(); + private final String neighborWorker; + private boolean isLeader; + private Terminate terminate = null; + private boolean voteToHalt = false; + private Map<String, Barrier> barriers = new HashMap<>(); + + public WorkerTraversalActor(final Traversal.Admin<?, ?> traversal, final Partition localPartition, final Partitioner partitioner) { + System.out.println("worker[created]: " + self().path()); + // set up partition and traversal information + this.localPartition = localPartition; + this.partitioner = partitioner; + final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), context()); + TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal); + this.matrix = new TraversalMatrix<>(traversal); + final GraphStep graphStep = (GraphStep) traversal.getStartStep(); + if (0 == graphStep.getIds().length) + ((GraphStep) traversal.getStartStep()).setIteratorSupplier(graphStep.returnsVertex() ? this.localPartition::vertices : this.localPartition::edges); + else { + if (graphStep.returnsVertex()) + ((GraphStep<Vertex, Vertex>) traversal.getStartStep()).setIteratorSupplier( + () -> IteratorUtils.filter(this.localPartition.vertices(graphStep.getIds()), this.localPartition::contains)); + else + ((GraphStep<Edge, Edge>) traversal.getStartStep()).setIteratorSupplier( + () -> IteratorUtils.filter(this.localPartition.edges(graphStep.getIds()), this.localPartition::contains)); + } + // create termination ring topology + final int i = this.partitioner.getPartitions().indexOf(this.localPartition); + this.neighborWorker = "../worker-" + this.partitioner.getPartitions().get(i == this.partitioner.getPartitions().size() - 1 ? 0 : i + 1).hashCode(); + this.isLeader = i == 0; + + receive(ReceiveBuilder. + match(StartMessage.class, start -> { + // initial message from master that says: "start processing" + final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep(); + while (step.hasNext()) { + this.sendTraverser(step.next()); + } + // internal vote to have in mailbox as final message to process + assert null == this.terminate; + if (this.isLeader) { + this.terminate = Terminate.MAYBE; + self().tell(VoteToHaltMessage.instance(), self()); + } + }). + match(Traverser.Admin.class, traverser -> { + this.processTraverser(traverser); + }). + match(SideEffectAddMessage.class, sideEffect -> { + this.matrix.getTraversal().getSideEffects().set(sideEffect.getSideEffectKey(), sideEffect.getSideEffectValue()); + }). + match(Terminate.class, terminate -> { + assert this.isLeader || this.terminate != Terminate.MAYBE; + this.terminate = terminate; + self().tell(VoteToHaltMessage.instance(), self()); + }). + match(BarrierDoneMessage.class, barrierDone -> { + final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(barrierDone.getStepId()); + while (step.hasNext()) { + sendTraverser(step.next()); + } + }). + match(VoteToHaltMessage.class, haltSync -> { + // if there is a barrier and thus, halting at barrier, then process barrier + if (!this.barriers.isEmpty()) { + for (final Barrier barrier : this.barriers.values()) { + while (barrier.hasNextBarrier()) { + master().tell(new BarrierAddMessage(barrier), self()); + } + } + this.barriers.clear(); + this.voteToHalt = false; + } + // use termination token to determine termination condition + if (null != this.terminate) { + if (this.isLeader) { + if (this.voteToHalt && Terminate.YES == this.terminate) + master().tell(VoteToHaltMessage.instance(), self()); + else + worker(this.neighborWorker).tell(Terminate.YES, self()); + } else + worker(this.neighborWorker).tell(this.voteToHalt ? this.terminate : Terminate.NO, self()); + this.terminate = null; + this.voteToHalt = true; + } + }).build() + ); + } + + private void processTraverser(final Traverser.Admin traverser) { + assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.localPartition.contains((Element) traverser.get()); + final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId()); + if (step instanceof Bypassing) ((Bypassing) step).setBypass(true); + GraphComputing.atMaster(step, false); + step.addStart(traverser); + if (step instanceof Barrier) { + this.barriers.put(step.getId(), (Barrier) step); + } else { + while (step.hasNext()) { + this.sendTraverser(step.next()); + } + } + } + + private void sendTraverser(final Traverser.Admin traverser) { + this.voteToHalt = false; + if (traverser.isHalted()) + master().tell(traverser, self()); + else if (traverser.get() instanceof Element && !this.localPartition.contains((Element) traverser.get())) + worker("../worker-" + this.partitioner.getPartition((Element) traverser.get()).hashCode()).tell(traverser, self()); + else + self().tell(traverser, self()); + } + + private ActorSelection worker(final String workerPath) { + ActorSelection worker = this.workers.get(workerPath); + if (null == worker) { + worker = context().actorSelection(workerPath); + this.workers.put(workerPath, worker); + } + return worker; + } + + private ActorRef master() { + return context().parent(); + } +} + http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalSideEffects.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalSideEffects.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalSideEffects.java new file mode 100644 index 0000000..9c03298 --- /dev/null +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalSideEffects.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.actor; + +import akka.actor.ActorContext; +import org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectAddMessage; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects; + +import java.util.Optional; +import java.util.Set; +import java.util.function.BinaryOperator; +import java.util.function.Supplier; +import java.util.function.UnaryOperator; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class WorkerTraversalSideEffects implements TraversalSideEffects { + + private TraversalSideEffects sideEffects; + private ActorContext context; + + + private WorkerTraversalSideEffects() { + // for serialization + } + + public WorkerTraversalSideEffects(final TraversalSideEffects sideEffects, final ActorContext context) { + this.sideEffects = sideEffects; + this.context = context; + } + + public TraversalSideEffects getSideEffects() { + return this.sideEffects; + } + + @Override + public void set(final String key, final Object value) { + this.sideEffects.set(key, value); + } + + @Override + public <V> V get(final String key) throws IllegalArgumentException { + return this.sideEffects.get(key); + } + + @Override + public void remove(final String key) { + this.sideEffects.remove(key); + } + + @Override + public Set<String> keys() { + return this.sideEffects.keys(); + } + + @Override + public void add(final String key, final Object value) { + this.context.parent().tell(new SideEffectAddMessage(key, value), this.context.self()); + } + + @Override + public <V> void register(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) { + this.sideEffects.register(key, initialValue, reducer); + } + + @Override + public <V> void registerIfAbsent(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) { + this.sideEffects.registerIfAbsent(key, initialValue, reducer); + } + + @Override + public <V> BinaryOperator<V> getReducer(final String key) { + return this.sideEffects.getReducer(key); + } + + @Override + public <V> Supplier<V> getSupplier(final String key) { + return this.sideEffects.getSupplier(key); + } + + @Override + @Deprecated + public void registerSupplier(final String key, final Supplier supplier) { + this.sideEffects.registerSupplier(key, supplier); + } + + @Override + @Deprecated + public <V> Optional<Supplier<V>> getRegisteredSupplier(final String key) { + return this.sideEffects.getRegisteredSupplier(key); + } + + @Override + public <S> void setSack(final Supplier<S> initialValue, final UnaryOperator<S> splitOperator, final BinaryOperator<S> mergeOperator) { + this.sideEffects.setSack(initialValue, splitOperator, mergeOperator); + } + + @Override + public <S> Supplier<S> getSackInitialValue() { + return this.sideEffects.getSackInitialValue(); + } + + @Override + public <S> UnaryOperator<S> getSackSplitter() { + return this.sideEffects.getSackSplitter(); + } + + @Override + public <S> BinaryOperator<S> getSackMerger() { + return this.sideEffects.getSackMerger(); + } + + @Override + public TraversalSideEffects clone() { + try { + final WorkerTraversalSideEffects clone = (WorkerTraversalSideEffects) super.clone(); + clone.sideEffects = this.sideEffects.clone(); + return clone; + } catch (final CloneNotSupportedException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + @Override + public void mergeInto(final TraversalSideEffects sideEffects) { + this.sideEffects.mergeInto(sideEffects); + } + +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierAddMessage.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierAddMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierAddMessage.java new file mode 100644 index 0000000..4a351c1 --- /dev/null +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierAddMessage.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.actor.message; + +import org.apache.tinkerpop.gremlin.process.traversal.Step; +import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class BarrierAddMessage { + + private final Object barrier; + private final String stepId; + + public BarrierAddMessage(final Barrier barrier) { + this.barrier = barrier.nextBarrier(); + this.stepId = ((Step) barrier).getId(); + } + + public Object getBarrier() { + return this.barrier; + } + + public String getStepId() { + return this.stepId; + } + + +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierDoneMessage.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierDoneMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierDoneMessage.java new file mode 100644 index 0000000..208b346 --- /dev/null +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierDoneMessage.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.actor.message; + +import org.apache.tinkerpop.gremlin.process.traversal.Step; +import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class BarrierDoneMessage { + + private final String stepId; + + public BarrierDoneMessage(final Barrier barrier) { + this.stepId = ((Step) barrier).getId(); + + } + + public String getStepId() { + return this.stepId; + } +} + http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java new file mode 100644 index 0000000..4a54d97 --- /dev/null +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.actor.message; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class SideEffectAddMessage { + + private final String sideEffectKey; + private final Object sideEffect; + + public SideEffectAddMessage(final String sideEffectKey, final Object sideEffect) { + this.sideEffect = sideEffect; + this.sideEffectKey = sideEffectKey; + } + + public String getSideEffectKey() { + return this.sideEffectKey; + } + + public Object getSideEffectValue() { + return this.sideEffect; + } +} + http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/StartMessage.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/StartMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/StartMessage.java new file mode 100644 index 0000000..ebc469c --- /dev/null +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/StartMessage.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.actor.message; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class StartMessage { + + private static final StartMessage INSTANCE = new StartMessage(); + + private StartMessage() { + } + + public static StartMessage instance() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/VoteToHaltMessage.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/VoteToHaltMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/VoteToHaltMessage.java new file mode 100644 index 0000000..8bfa4c9 --- /dev/null +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/VoteToHaltMessage.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.actor.message; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class VoteToHaltMessage { + + private static final VoteToHaltMessage INSTANCE = new VoteToHaltMessage(); + + private VoteToHaltMessage() { + } + + public static VoteToHaltMessage instance() { + return INSTANCE; + } +} + http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/step/map/ActorStep.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/step/map/ActorStep.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/step/map/ActorStep.java new file mode 100644 index 0000000..db41493 --- /dev/null +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/step/map/ActorStep.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.traversal.step.map; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ + +import org.apache.tinkerpop.gremlin.akka.process.actor.AkkaActors; +import org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.decoration.ActorStrategy; +import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies; +import org.apache.tinkerpop.gremlin.process.traversal.Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep; +import org.apache.tinkerpop.gremlin.structure.Partitioner; +import org.apache.tinkerpop.gremlin.structure.util.StringFactory; + +import java.util.NoSuchElementException; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class ActorStep<S, E> extends AbstractStep<E, E> { + + public final Traversal.Admin<S, E> partitionTraversal; + private final Partitioner partitioner; + private boolean first = true; + + public ActorStep(final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) { + super(traversal); + this.partitionTraversal = (Traversal.Admin) traversal.clone(); + final TraversalStrategies strategies = this.partitionTraversal.getStrategies().clone(); + strategies.removeStrategies(ActorStrategy.class); + strategies.addStrategies(VertexProgramStrategy.instance()); + this.partitionTraversal.setStrategies(strategies); + this.partitioner = partitioner; + } + + @Override + public String toString() { + return StringFactory.stepString(this, this.partitionTraversal); + } + + @Override + protected Traverser.Admin<E> processNextStart() throws NoSuchElementException { + if (this.first) { + this.first = false; + final AkkaActors<S, E> actors = new AkkaActors<>(this.partitionTraversal, this.partitioner); + try { + actors.getResults().get().forEach(this.starts::add); + } catch (final Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + return this.starts.next(); + } +} + http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/decoration/ActorStrategy.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/decoration/ActorStrategy.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/decoration/ActorStrategy.java new file mode 100644 index 0000000..adbc257 --- /dev/null +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/decoration/ActorStrategy.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.decoration; + +import org.apache.tinkerpop.gremlin.akka.process.traversal.step.map.ActorStep; +import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy; +import org.apache.tinkerpop.gremlin.process.remote.traversal.strategy.decoration.RemoteStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; +import org.apache.tinkerpop.gremlin.structure.Partitioner; + +import java.util.Collections; +import java.util.Set; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class ActorStrategy extends AbstractTraversalStrategy<TraversalStrategy.DecorationStrategy> + implements TraversalStrategy.DecorationStrategy { + + + private static final Set<Class<? extends DecorationStrategy>> PRIORS = Collections.singleton(RemoteStrategy.class); + private static final Set<Class<? extends DecorationStrategy>> POSTS = Collections.singleton(VertexProgramStrategy.class); + + private final Partitioner partitioner; + + public ActorStrategy(final Partitioner partitioner) { + this.partitioner = partitioner; + } + + @Override + public void apply(final Traversal.Admin<?, ?> traversal) { + ReadOnlyStrategy.instance().apply(traversal); + if (!TraversalHelper.getStepsOfAssignableClass(InjectStep.class, traversal).isEmpty()) + throw new VerificationException("Inject traversal currently not supported", traversal); + + if (!(traversal.getParent() instanceof EmptyStep)) + return; + + final ActorStep<?, ?> actorStep = new ActorStep<>(traversal, this.partitioner); + TraversalHelper.removeAllSteps(traversal); + traversal.addStep(actorStep); + + // validations + assert traversal.getStartStep().equals(actorStep); + assert traversal.getSteps().size() == 1; + assert traversal.getEndStep() == actorStep; + } + + @Override + public Set<Class<? extends DecorationStrategy>> applyPost() { + return POSTS; + } + + @Override + public Set<Class<? extends DecorationStrategy>> applyPrior() { + return PRIORS; + } +} + http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/verification/ActorVerificationStrategy.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/verification/ActorVerificationStrategy.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/verification/ActorVerificationStrategy.java new file mode 100644 index 0000000..a9e3f7b --- /dev/null +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/verification/ActorVerificationStrategy.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.verification; + +import org.apache.tinkerpop.gremlin.process.traversal.Step; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; +import org.apache.tinkerpop.gremlin.structure.Graph; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class ActorVerificationStrategy extends AbstractTraversalStrategy<TraversalStrategy.VerificationStrategy> implements TraversalStrategy.VerificationStrategy { + + private static final ActorVerificationStrategy INSTANCE = new ActorVerificationStrategy(); + + private ActorVerificationStrategy() { + } + + @Override + public void apply(final Traversal.Admin<?, ?> traversal) { + if (!TraversalHelper.onGraphComputer(traversal)) + return; + final boolean globalChild = TraversalHelper.isGlobalChild(traversal); + for (final Step<?, ?> step : traversal.getSteps()) { + // only global children are graph computing + if (globalChild && step instanceof GraphComputing) + ((GraphComputing) step).onGraphComputer(); + + for (String label : step.getLabels()) { + if (Graph.Hidden.isHidden(label)) + step.removeLabel(label); + } + } + } + + public static ActorVerificationStrategy instance() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/resources/application.conf b/akka-gremlin/src/main/resources/application.conf new file mode 100644 index 0000000..706e3a6 --- /dev/null +++ b/akka-gremlin/src/main/resources/application.conf @@ -0,0 +1,11 @@ +custom-dispatcher { + mailbox-requirement = "org.apache.tinkerpop.gremlin.akka.process.actor.TraverserMailbox$TraverserSetSemantics" +} + +akka.actor.mailbox.requirements { + "org.apache.tinkerpop.gremlin.akka.process.actor.TraverserMailbox$TraverserSetSemantics" = custom-dispatcher-mailbox +} + +custom-dispatcher-mailbox { + mailbox-type = "org.apache.tinkerpop.gremlin.akka.process.actor.TraverserMailbox" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProcessStandardTest.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProcessStandardTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProcessStandardTest.java new file mode 100644 index 0000000..2e84bd9 --- /dev/null +++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProcessStandardTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process; + +import org.apache.tinkerpop.gremlin.GraphProviderClass; +import org.apache.tinkerpop.gremlin.process.ProcessStandardSuite; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; +import org.junit.runner.RunWith; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +@RunWith(ProcessStandardSuite.class) +@GraphProviderClass(provider = AkkaActorsProvider.class, graph = TinkerGraph.class) +public class AkkaActorsProcessStandardTest { +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java new file mode 100644 index 0000000..9158040 --- /dev/null +++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process; + +import org.apache.commons.configuration.Configuration; +import org.apache.tinkerpop.gremlin.AbstractGraphProvider; +import org.apache.tinkerpop.gremlin.LoadGraphWith; +import org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.decoration.ActorStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.ProgramTest; +import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectTest; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategyProcessTest; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.PartitionStrategyProcessTest; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.SubgraphStrategyProcessTest; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.VertexProperty; +import org.apache.tinkerpop.gremlin.structure.util.partitioner.HashPartitioner; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerEdge; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerElement; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraphVariables; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerProperty; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertex; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertexProperty; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class AkkaActorsProvider extends AbstractGraphProvider { + + protected static final boolean IMPORT_STATICS = new Random().nextBoolean(); + + private static Set<String> SKIP_TESTS = new HashSet<>(Arrays.asList( + "g_V_outXfollowedByX_group_byXsongTypeX_byXbothE_group_byXlabelX_byXweight_sumXX", + SideEffectTest.Traversals.class.getCanonicalName(), + SubgraphStrategyProcessTest.class.getCanonicalName(), + ProfileTest.Traversals.class.getCanonicalName(), + PartitionStrategyProcessTest.class.getCanonicalName(), + EventStrategyProcessTest.class.getCanonicalName(), + ElementIdStrategyProcessTest.class.getCanonicalName(), + TraversalInterruptionTest.class.getCanonicalName(), + ProgramTest.Traversals.class.getCanonicalName())); + + private static final Set<Class> IMPLEMENTATION = new HashSet<Class>() {{ + add(TinkerEdge.class); + add(TinkerElement.class); + add(TinkerGraph.class); + add(TinkerGraphVariables.class); + add(TinkerProperty.class); + add(TinkerVertex.class); + add(TinkerVertexProperty.class); + }}; + + @Override + public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, + final LoadGraphWith.GraphData loadGraphWith) { + + final TinkerGraph.DefaultIdManager idManager = selectIdMakerFromGraphData(loadGraphWith); + final String idMaker = (idManager.equals(TinkerGraph.DefaultIdManager.ANY) ? selectIdMakerFromGraphData(loadGraphWith) : idManager).name(); + return new HashMap<String, Object>() {{ + put(Graph.GRAPH, TinkerGraph.class.getName()); + put(TinkerGraph.GREMLIN_TINKERGRAPH_VERTEX_ID_MANAGER, idMaker); + put(TinkerGraph.GREMLIN_TINKERGRAPH_EDGE_ID_MANAGER, idMaker); + put(TinkerGraph.GREMLIN_TINKERGRAPH_VERTEX_PROPERTY_ID_MANAGER, idMaker); + put("skipTest", SKIP_TESTS.contains(testMethodName) || SKIP_TESTS.contains(test.getCanonicalName())); + if (loadGraphWith == LoadGraphWith.GraphData.CREW) + put(TinkerGraph.GREMLIN_TINKERGRAPH_DEFAULT_VERTEX_PROPERTY_CARDINALITY, VertexProperty.Cardinality.list.name()); + }}; + } + + @Override + public void clear(final Graph graph, final Configuration configuration) throws Exception { + if (graph != null) graph.close(); + } + + @Override + public Set<Class> getImplementations() { + return IMPLEMENTATION; + } + + /** + * Test that load with specific graph data can be configured with a specific id manager as the data type to + * be used in the test for that graph is known. + */ + protected TinkerGraph.DefaultIdManager selectIdMakerFromGraphData(final LoadGraphWith.GraphData loadGraphWith) { + if (null == loadGraphWith) return TinkerGraph.DefaultIdManager.ANY; + if (loadGraphWith.equals(LoadGraphWith.GraphData.CLASSIC)) + return TinkerGraph.DefaultIdManager.INTEGER; + else if (loadGraphWith.equals(LoadGraphWith.GraphData.MODERN)) + return TinkerGraph.DefaultIdManager.INTEGER; + else if (loadGraphWith.equals(LoadGraphWith.GraphData.CREW)) + return TinkerGraph.DefaultIdManager.INTEGER; + else if (loadGraphWith.equals(LoadGraphWith.GraphData.GRATEFUL)) + return TinkerGraph.DefaultIdManager.INTEGER; + else + throw new IllegalStateException(String.format("Need to define a new %s for %s", TinkerGraph.IdManager.class.getName(), loadGraphWith.name())); + } + +///////////////////////////// +///////////////////////////// +///////////////////////////// + + @Override + public GraphTraversalSource traversal(final Graph graph) { + if ((Boolean) graph.configuration().getProperty("skipTest")) + return graph.traversal(); + //throw new VerificationException("This test current does not work with Gremlin-Python", EmptyTraversal.instance()); + else { + final GraphTraversalSource g = graph.traversal(); + return g.withStrategies(new ActorStrategy(new HashPartitioner(graph.partitioner(), 3))); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java new file mode 100644 index 0000000..b064331 --- /dev/null +++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process; + +import org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.decoration.ActorStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo; +import org.apache.tinkerpop.gremlin.structure.util.partitioner.HashPartitioner; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; +import org.junit.Test; + +import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.inE; +import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.outE; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class AkkaPlayTest { + + @Test + public void testPlay1() throws Exception { + final Graph graph = TinkerGraph.open(); + graph.io(GryoIo.build()).readGraph("../data/tinkerpop-modern.kryo"); + GraphTraversalSource g = graph.traversal().withStrategies(new ActorStrategy(new HashPartitioner(graph.partitioner(), 3))); + System.out.println(g.V(1, 2).union(outE().count(), inE().count(), (Traversal) outE().values("weight").sum()).toList()); + //3, 1.9, 1 + /*for (int i = 0; i < 10000; i++) { + final Graph graph = TinkerGraph.open(); + graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo"); + final GraphTraversalSource g = graph.traversal().withComputer(); + final List<Pair<Integer, Traversal.Admin<?, ?>>> traversals = Arrays.asList( + // match() works + Pair.with(6, g.V().match( + as("a").out("created").as("b"), + as("b").in("created").as("c"), + as("b").has("name", P.eq("lop"))).where("a", P.neq("c")).select("a", "b", "c").by("name").asAdmin()), + // side-effects work + Pair.with(3, g.V().repeat(both()).times(2). + groupCount("a").by("name"). + cap("a").unfold().order().by(Column.values, Order.decr).limit(3).asAdmin()), + // barriers work and beyond the local star graph works + Pair.with(1, g.V().repeat(both()).times(2).hasLabel("person"). + group(). + by("name"). + by(out("created").values("name").dedup().fold()).asAdmin()), + // no results works + Pair.with(0, g.V().out("blah").asAdmin()) + ); + for (final Pair<Integer,Traversal.Admin<?, ?>> pair : traversals) { + final Integer count = pair.getValue0(); + final Traversal.Admin<?,?> traversal = pair.getValue1(); + System.out.println("EXECUTING: " + traversal.getBytecode()); + final TinkerActorSystem<?,?> actors = new TinkerActorSystem<>(traversal.clone(),new HashPartitioner(graph.partitioner(), 3)); + System.out.println(IteratorUtils.asList(actors.getResults().get())); + if(IteratorUtils.count(actors.getResults().get()) != count) + throw new IllegalStateException(); + System.out.println("//////////////////////////////////\n"); + } + } + }*/ + + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java index f9c85a2..e4e2cb5 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java @@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSe import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import java.util.Collections; import java.util.NoSuchElementException; @@ -96,8 +97,10 @@ public abstract class CollectingBarrierStep<S> extends AbstractStep<S, S> implem @Override public void addBarrier(final TraverserSet<S> barrier) { - this.traverserSet = barrier; - this.traverserSet.forEach(traverser -> traverser.setSideEffects(this.getTraversal().getSideEffects())); + IteratorUtils.removeOnNext(barrier.iterator()).forEachRemaining(traverser -> { + traverser.setSideEffects(this.getTraversal().getSideEffects()); + this.starts.add(traverser); + }); this.barrierConsumer(this.traverserSet); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Graph.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Graph.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Graph.java index 2db37d3..61b17a3 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Graph.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Graph.java @@ -30,6 +30,7 @@ import org.apache.tinkerpop.gremlin.structure.io.IoRegistry; import org.apache.tinkerpop.gremlin.structure.util.FeatureDescriptor; import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; import org.apache.tinkerpop.gremlin.structure.util.Host; +import org.apache.tinkerpop.gremlin.structure.util.partitioner.GlobalPartitioner; import org.javatuples.Pair; import java.lang.annotation.ElementType; @@ -331,11 +332,20 @@ public interface Graph extends AutoCloseable, Host { * Whatever configuration was passed to {@link GraphFactory#open(org.apache.commons.configuration.Configuration)} * is what should be returned by this method. * - * @return the configuration used during graph construction. + * @return the configuration used during graph construction */ public Configuration configuration(); /** + * Get the {@link Partitioner} describing how the graph's elements are partitioned across a cluster. + * + * @return the partitioner of the graph + */ + public default Partitioner partitioner() { + return new GlobalPartitioner(this); + } + + /** * Graph variables are a set of key/value pairs associated with the graph. The keys are String and the values * are Objects. */ http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java new file mode 100644 index 0000000..12faca9 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.structure; + +import java.net.URI; +import java.util.Iterator; + +/** + * A {@code Partition} represents a physical or logical split of the underlying {@link Graph} structure. + * In distributed graph systems, a physical partition denotes which vertices/edges are in the subgraph of the underyling + * physical machine. In a logical partition, a physical partition may be split amongst multiple threads and thus, + * while isolated logically, they are united physically. + * + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public interface Partition { + + /** + * Whether or not this element was, is, or will be contained in this partition. + * Containment is not whether the element currently exists, but instead whether if it did exist, would it be + * contained in this partition. + * + * @param element the element to check for containment + * @return whether the element would be contained in this partition + */ + public boolean contains(final Element element); + + /** + * The current existing vertices contained in this partition. + * + * @param ids filtering to only those ids provided + * @return an iterator of vertices contained in the partition + */ + public Iterator<Vertex> vertices(final Object... ids); + + /** + * The current existing edges contained in this partition. + * + * @param ids filtering to only those ids provided + * @return an iterator of edges contained in the partition + */ + public Iterator<Edge> edges(final Object... ids); + + /** + * Get the {@link URI} location of the partition. + * + * @return the location of the partition + */ + public URI location(); + + public static interface PhysicalPartition extends Partition { + } + + public static interface LogicalPartition extends Partition { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partitioner.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partitioner.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partitioner.java new file mode 100644 index 0000000..1d4aae1 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partitioner.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.structure; + +import java.util.List; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public interface Partitioner { + + public List<Partition> getPartitions(); + + public Partition getPartition(final Element element); + +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/93d58caf/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java new file mode 100644 index 0000000..910de8e --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.structure.util.partitioner; + +import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.Element; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.Partition; +import org.apache.tinkerpop.gremlin.structure.Partitioner; +import org.apache.tinkerpop.gremlin.structure.Vertex; + +import java.net.URI; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class GlobalPartitioner implements Partitioner { + + private final GlobalPartition partition; + + public GlobalPartitioner(final Graph graph) { + this.partition = new GlobalPartition(graph); + } + + @Override + public List<Partition> getPartitions() { + return Collections.singletonList(this.partition); + } + + @Override + public Partition getPartition(final Element element) { + return this.partition; + } + + private class GlobalPartition implements Partition { + + private final Graph graph; + + private GlobalPartition(final Graph graph) { + this.graph = graph; + } + + @Override + public boolean contains(final Element element) { + return true; + } + + @Override + public Iterator<Vertex> vertices(final Object... ids) { + return this.graph.vertices(ids); + } + + @Override + public Iterator<Edge> edges(final Object... ids) { + return this.graph.edges(ids); + } + + @Override + public URI location() { + return URI.create("localhost"); + } + } +} +
