refactored GraphActors packaging -- its not actor/, but actors/. JavaDoc and various cleanups. Also, about to NOT serialize a traversal but instead use Bytecode. Next push will do this with TraveraslVertexProgram.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/243ab6a7 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/243ab6a7 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/243ab6a7 Branch: refs/heads/TINKERPOP-1564 Commit: 243ab6a7665cdf7d27e1bda387f5f6379c015abc Parents: 9852a60 Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Wed Jan 4 08:01:29 2017 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Thu Jan 19 13:01:41 2017 -0700 ---------------------------------------------------------------------- .../gremlin/akka/jsr223/AkkaGremlinPlugin.java | 2 +- .../akka/process/actor/ActorMailbox.java | 141 --------------- .../akka/process/actor/AkkaGraphActors.java | 130 -------------- .../gremlin/akka/process/actor/MasterActor.java | 117 ------------ .../gremlin/akka/process/actor/WorkerActor.java | 112 ------------ .../akka/process/actors/ActorMailbox.java | 141 +++++++++++++++ .../akka/process/actors/AkkaGraphActors.java | 130 ++++++++++++++ .../akka/process/actors/MasterActor.java | 117 ++++++++++++ .../akka/process/actors/WorkerActor.java | 112 ++++++++++++ .../src/main/resources/application.conf | 6 +- .../process/AkkaActorsProcessActorsTest.java | 34 ---- .../akka/process/AkkaActorsProvider.java | 158 ---------------- .../gremlin/akka/process/AkkaPlayTest.java | 89 --------- .../actors/AkkaActorsProcessActorsTest.java | 33 ++++ .../akka/process/actors/AkkaActorsProvider.java | 158 ++++++++++++++++ .../akka/process/actors/AkkaPlayTest.java | 89 +++++++++ .../tinkerpop/gremlin/process/actor/Actor.java | 95 ---------- .../gremlin/process/actor/ActorProgram.java | 145 --------------- .../gremlin/process/actor/ActorsResult.java | 30 ---- .../gremlin/process/actor/Address.java | 76 -------- .../gremlin/process/actor/GraphActors.java | 98 ---------- .../actor/traversal/TraversalActorProgram.java | 111 ------------ .../actor/traversal/TraversalMasterProgram.java | 179 ------------------- .../actor/traversal/TraversalWorkerProgram.java | 170 ------------------ .../traversal/WorkerTraversalSideEffects.java | 148 --------------- .../traversal/message/BarrierAddMessage.java | 47 ----- .../traversal/message/BarrierDoneMessage.java | 41 ----- .../traversal/message/SideEffectAddMessage.java | 43 ----- .../traversal/message/SideEffectSetMessage.java | 42 ----- .../actor/traversal/message/StartMessage.java | 35 ---- .../actor/traversal/message/Terminate.java | 28 --- .../step/map/TraversalActorProgramStep.java | 73 -------- .../decoration/ActorProgramStrategy.java | 94 ---------- .../verification/ActorVerificationStrategy.java | 55 ------ .../process/actor/util/DefaultActorsResult.java | 42 ----- .../process/actor/util/GraphActorsHelper.java | 48 ----- .../tinkerpop/gremlin/process/actors/Actor.java | 95 ++++++++++ .../gremlin/process/actors/ActorProgram.java | 145 +++++++++++++++ .../gremlin/process/actors/ActorsResult.java | 30 ++++ .../gremlin/process/actors/Address.java | 76 ++++++++ .../gremlin/process/actors/GraphActors.java | 107 +++++++++++ .../actors/traversal/TraversalActorProgram.java | 129 +++++++++++++ .../traversal/TraversalMasterProgram.java | 179 +++++++++++++++++++ .../traversal/TraversalWorkerProgram.java | 170 ++++++++++++++++++ .../traversal/WorkerTraversalSideEffects.java | 148 +++++++++++++++ .../traversal/message/BarrierAddMessage.java | 47 +++++ .../traversal/message/BarrierDoneMessage.java | 41 +++++ .../traversal/message/SideEffectAddMessage.java | 43 +++++ .../traversal/message/SideEffectSetMessage.java | 42 +++++ .../actors/traversal/message/StartMessage.java | 35 ++++ .../actors/traversal/message/Terminate.java | 28 +++ .../step/map/TraversalActorProgramStep.java | 73 ++++++++ .../decoration/ActorProgramStrategy.java | 94 ++++++++++ .../verification/ActorVerificationStrategy.java | 48 +++++ .../actors/util/DefaultActorsResult.java | 42 +++++ .../process/actors/util/GraphActorsHelper.java | 48 +++++ .../process/traversal/TraversalStrategies.java | 2 +- .../gremlin/structure/util/StringFactory.java | 2 +- .../apache/tinkerpop/gremlin/GraphManager.java | 2 +- .../apache/tinkerpop/gremlin/GraphProvider.java | 2 +- .../gremlin/process/actors/GraphActorsTest.java | 1 - 61 files changed, 2408 insertions(+), 2390 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/jsr223/AkkaGremlinPlugin.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/jsr223/AkkaGremlinPlugin.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/jsr223/AkkaGremlinPlugin.java index 049c5b7..5c06bff 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/jsr223/AkkaGremlinPlugin.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/jsr223/AkkaGremlinPlugin.java @@ -19,7 +19,7 @@ package org.apache.tinkerpop.gremlin.akka.jsr223; -import org.apache.tinkerpop.gremlin.akka.process.actor.AkkaGraphActors; +import org.apache.tinkerpop.gremlin.akka.process.actors.AkkaGraphActors; import org.apache.tinkerpop.gremlin.jsr223.AbstractGremlinPlugin; import org.apache.tinkerpop.gremlin.jsr223.DefaultImportCustomizer; import org.apache.tinkerpop.gremlin.jsr223.ImportCustomizer; http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/ActorMailbox.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/ActorMailbox.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/ActorMailbox.java deleted file mode 100644 index c8e5fde..0000000 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/ActorMailbox.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.akka.process.actor; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.dispatch.Envelope; -import akka.dispatch.MailboxType; -import akka.dispatch.MessageQueue; -import akka.dispatch.ProducesMessageQueue; -import com.typesafe.config.Config; -import org.apache.tinkerpop.gremlin.process.traversal.Traverser; -import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; -import scala.Option; - -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class ActorMailbox implements MailboxType, ProducesMessageQueue<ActorMailbox.ActorMessageQueue> { - - private final List<Class> messagePriorities = new ArrayList<>(); - - public static class ActorMessageQueue implements MessageQueue, ActorSemantics { - private final List<Class> messagePriorities; - private final List<Queue> messages; - private final Object MUTEX = new Object(); - - public ActorMessageQueue(final List<Class> messagePriorities) { - this.messagePriorities = messagePriorities; - this.messages = new ArrayList<>(this.messagePriorities.size()); - for (final Class clazz : this.messagePriorities) { - final Queue queue; - if (Traverser.class.isAssignableFrom(clazz)) - queue = new TraverserSet<>(); - else - queue = new LinkedList<>(); - this.messages.add(queue); - } - } - - public void enqueue(final ActorRef receiver, final Envelope handle) { - synchronized (MUTEX) { - final Object message = handle.message(); - for (int i = 0; i < this.messagePriorities.size(); i++) { - final Class clazz = this.messagePriorities.get(i); - if (clazz.isInstance(message)) { - this.messages.get(i).offer(message instanceof Traverser ? message : handle); - return; - } - } - throw new IllegalArgumentException("The provided message type is not registered: " + handle.message().getClass()); - } - } - - public Envelope dequeue() { - synchronized (MUTEX) { - for (final Queue queue : this.messages) { - if (!queue.isEmpty()) { - final Object m = queue.poll(); - return m instanceof Traverser ? new Envelope(m, ActorRef.noSender()) : (Envelope) m; - } - } - return null; - } - } - - public int numberOfMessages() { - synchronized (MUTEX) { - int counter = 0; - for (final Queue queue : this.messages) { - counter = counter + queue.size(); - } - return counter; - } - } - - public boolean hasMessages() { - synchronized (MUTEX) { - for (final Queue queue : this.messages) { - if (!queue.isEmpty()) - return true; - } - return false; - } - } - - public void cleanUp(final ActorRef owner, final MessageQueue deadLetters) { - synchronized (MUTEX) { - for (final Queue queue : this.messages) { - while (!queue.isEmpty()) { - final Object m = queue.poll(); - deadLetters.enqueue(owner, m instanceof Traverser ? new Envelope(m, ActorRef.noSender()) : (Envelope) m); - } - } - } - } - } - - // This constructor signature must exist, it will be called by Akka - public ActorMailbox(final ActorSystem.Settings settings, final Config config) { - try { - final String[] messages = ((String) settings.config().getAnyRef("message-priorities")).replace("[", "").replace("]", "").split(","); - for (final String clazz : messages) { - this.messagePriorities.add(Class.forName(clazz.trim())); - } - } catch (final ClassNotFoundException e) { - throw new IllegalArgumentException(e.getMessage(), e); - } - } - - // The create method is called to create the MessageQueue - public MessageQueue create(final Option<ActorRef> owner, final Option<ActorSystem> system) { - return new ActorMessageQueue(this.messagePriorities); - } - - public static interface ActorSemantics { - - } -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java deleted file mode 100644 index bc692c0..0000000 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.akka.process.actor; - -import akka.actor.ActorSystem; -import akka.actor.Props; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigValueFactory; -import org.apache.commons.configuration.BaseConfiguration; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationUtils; -import org.apache.tinkerpop.gremlin.process.actor.ActorProgram; -import org.apache.tinkerpop.gremlin.process.actor.ActorsResult; -import org.apache.tinkerpop.gremlin.process.actor.Address; -import org.apache.tinkerpop.gremlin.process.actor.GraphActors; -import org.apache.tinkerpop.gremlin.process.actor.util.DefaultActorsResult; -import org.apache.tinkerpop.gremlin.process.actor.util.GraphActorsHelper; -import org.apache.tinkerpop.gremlin.structure.Graph; -import org.apache.tinkerpop.gremlin.structure.Partitioner; -import org.apache.tinkerpop.gremlin.structure.util.StringFactory; -import org.apache.tinkerpop.gremlin.structure.util.partitioner.HashPartitioner; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.Collections; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; -import java.util.stream.Collectors; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class AkkaGraphActors<R> implements GraphActors<R> { - - private ActorProgram<R> actorProgram; - private int workers = 1; - private Configuration configuration; - private boolean executed = false; - - private AkkaGraphActors(final Configuration configuration) { - this.configuration = new BaseConfiguration(); - ConfigurationUtils.copy(configuration, this.configuration); - this.configuration.setProperty(GRAPH_ACTORS, AkkaGraphActors.class.getCanonicalName()); - GraphActorsHelper.configure(this, this.configuration); - } - - @Override - public String toString() { - return StringFactory.graphActorsString(this); - } - - @Override - public GraphActors<R> program(final ActorProgram<R> actorProgram) { - this.actorProgram = actorProgram; - return this; - } - - @Override - public GraphActors<R> workers(final int workers) { - this.workers = workers; - this.configuration.setProperty(GRAPH_ACTORS_WORKERS, workers); - return this; - } - - @Override - public GraphActors<R> configure(final String key, final Object value) { - this.configuration.setProperty(key, value); - return this; - } - - @Override - public Future<R> submit(final Graph graph) { - if (this.executed) - throw new IllegalStateException("Can not execute twice"); - this.executed = true; - final Config config = ConfigFactory.defaultApplication().withValue("message-priorities", - ConfigValueFactory.fromAnyRef(this.actorProgram.getMessagePriorities(). - orElse(Collections.singletonList(Object.class)). - stream(). - map(Class::getCanonicalName). - collect(Collectors.toList()).toString())); - final ActorSystem system = ActorSystem.create("traversal-" + UUID.randomUUID(), config); - final ActorsResult<R> result = new DefaultActorsResult<>(); - final Partitioner partitioner = this.workers == 1 ? graph.partitioner() : new HashPartitioner(graph.partitioner(), this.workers); - try { - new Address.Master(system.actorOf(Props.create(MasterActor.class, this.actorProgram, partitioner, result), "master").path().toString(), InetAddress.getLocalHost()); - } catch (final UnknownHostException e) { - throw new IllegalStateException(e.getMessage(), e); - } - return CompletableFuture.supplyAsync(() -> { - while (!system.isTerminated()) { - - } - return result.getResult(); - }); - } - - @Override - public Configuration configuration() { - return this.configuration; - } - - public static AkkaGraphActors open(final Configuration configuration) { - return new AkkaGraphActors(configuration); - } - - public static AkkaGraphActors open() { - return new AkkaGraphActors(new BaseConfiguration()); - } -} - http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java deleted file mode 100644 index 0173a8f..0000000 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.akka.process.actor; - -import akka.actor.AbstractActor; -import akka.actor.ActorSelection; -import akka.actor.Props; -import akka.dispatch.RequiresMessageQueue; -import akka.japi.pf.ReceiveBuilder; -import org.apache.tinkerpop.gremlin.process.actor.Actor; -import org.apache.tinkerpop.gremlin.process.actor.ActorProgram; -import org.apache.tinkerpop.gremlin.process.actor.ActorsResult; -import org.apache.tinkerpop.gremlin.process.actor.Address; -import org.apache.tinkerpop.gremlin.structure.Partition; -import org.apache.tinkerpop.gremlin.structure.Partitioner; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class MasterActor extends AbstractActor implements RequiresMessageQueue<ActorMailbox.ActorSemantics>, Actor.Master { - - private final ActorProgram.Master masterProgram; - private final Address.Master master; - private final List<Address.Worker> workers; - private final Map<Address, ActorSelection> actors = new HashMap<>(); - private final ActorsResult<?> result; - private final Partitioner partitioner; - - public MasterActor(final ActorProgram program, final Partitioner partitioner, final ActorsResult<?> result) { - this.partitioner = partitioner; - this.result = result; - try { - this.master = new Address.Master(self().path().toString(), InetAddress.getLocalHost()); - } catch (final UnknownHostException e) { - throw new IllegalStateException(e.getMessage(), e); - } - this.workers = new ArrayList<>(); - final List<Partition> partitions = partitioner.getPartitions(); - for (final Partition partition : partitions) { - final String workerPathString = "worker-" + partition.id(); - this.workers.add(new Address.Worker(workerPathString, partition.location())); - context().actorOf(Props.create(WorkerActor.class, program, this.master, partition, partitioner), workerPathString); - } - this.masterProgram = program.createMasterProgram(this); - receive(ReceiveBuilder.matchAny(this.masterProgram::execute).build()); - } - - @Override - public void preStart() { - this.masterProgram.setup(); - } - - @Override - public void postStop() { - this.masterProgram.terminate(); - } - - @Override - public <M> void send(final Address toActor, final M message) { - ActorSelection actor = this.actors.get(toActor); - if (null == actor) { - actor = context().actorSelection(toActor.getId()); - this.actors.put(toActor, actor); - } - actor.tell(message, self()); - } - - @Override - public List<Address.Worker> workers() { - return this.workers; - } - - @Override - public Partitioner partitioner() { - return this.partitioner; - } - - @Override - public Address.Master address() { - return this.master; - } - - @Override - public void close() { - context().system().terminate(); - } - - @Override - public <R> ActorsResult<R> result() { - return (ActorsResult<R>) this.result; - } - -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java deleted file mode 100644 index 27f942a..0000000 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.akka.process.actor; - -import akka.actor.AbstractActor; -import akka.actor.ActorSelection; -import akka.dispatch.RequiresMessageQueue; -import akka.japi.pf.ReceiveBuilder; -import org.apache.tinkerpop.gremlin.process.actor.Actor; -import org.apache.tinkerpop.gremlin.process.actor.ActorProgram; -import org.apache.tinkerpop.gremlin.process.actor.Address; -import org.apache.tinkerpop.gremlin.structure.Partition; -import org.apache.tinkerpop.gremlin.structure.Partitioner; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class WorkerActor extends AbstractActor implements RequiresMessageQueue<ActorMailbox.ActorSemantics>, Actor.Worker { - - private final ActorProgram.Worker workerProgram; - private final Partition localPartition; - private final Partitioner partitioner; - private final Address.Worker self; - private final Address.Master master; - private final List<Address.Worker> workers; - private final Map<Address, ActorSelection> actors = new HashMap<>(); - - public WorkerActor(final ActorProgram program, final Address.Master master, final Partition localPartition, final Partitioner partitioner) { - this.localPartition = localPartition; - this.partitioner = partitioner; - this.self = new Address.Worker(this.createWorkerAddress(localPartition), localPartition.location()); - this.master = master; - this.workers = new ArrayList<>(); - for (final Partition partition : partitioner.getPartitions()) { - this.workers.add(new Address.Worker(this.createWorkerAddress(partition), partition.location())); - } - this.workerProgram = program.createWorkerProgram(this); - receive(ReceiveBuilder.matchAny(this.workerProgram::execute).build()); - } - - @Override - public void preStart() { - this.workerProgram.setup(); - } - - @Override - public void postStop() { - this.workerProgram.terminate(); - } - - @Override - public <M> void send(final Address toActor, final M message) { - ActorSelection actor = this.actors.get(toActor); - if (null == actor) { - actor = context().actorSelection(toActor.getId()); - this.actors.put(toActor, actor); - } - actor.tell(message, self()); - } - - @Override - public List<Address.Worker> workers() { - return this.workers; - } - - @Override - public Partition partition() { - return this.localPartition; - } - - @Override - public Partitioner partitioner() { - return this.partitioner; - } - - @Override - public Address.Worker address() { - return this.self; - } - - @Override - public Address.Master master() { - return this.master; - } - - private String createWorkerAddress(final Partition partition) { - return "../worker-" + partition.id(); - } -} - http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java new file mode 100644 index 0000000..8087038 --- /dev/null +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.actors; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.dispatch.Envelope; +import akka.dispatch.MailboxType; +import akka.dispatch.MessageQueue; +import akka.dispatch.ProducesMessageQueue; +import com.typesafe.config.Config; +import org.apache.tinkerpop.gremlin.process.traversal.Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; +import scala.Option; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class ActorMailbox implements MailboxType, ProducesMessageQueue<ActorMailbox.ActorMessageQueue> { + + private final List<Class> messagePriorities = new ArrayList<>(); + + public static class ActorMessageQueue implements MessageQueue, ActorSemantics { + private final List<Class> messagePriorities; + private final List<Queue> messages; + private final Object MUTEX = new Object(); + + public ActorMessageQueue(final List<Class> messagePriorities) { + this.messagePriorities = messagePriorities; + this.messages = new ArrayList<>(this.messagePriorities.size()); + for (final Class clazz : this.messagePriorities) { + final Queue queue; + if (Traverser.class.isAssignableFrom(clazz)) + queue = new TraverserSet<>(); + else + queue = new LinkedList<>(); + this.messages.add(queue); + } + } + + public void enqueue(final ActorRef receiver, final Envelope handle) { + synchronized (MUTEX) { + final Object message = handle.message(); + for (int i = 0; i < this.messagePriorities.size(); i++) { + final Class clazz = this.messagePriorities.get(i); + if (clazz.isInstance(message)) { + this.messages.get(i).offer(message instanceof Traverser ? message : handle); + return; + } + } + throw new IllegalArgumentException("The provided message type is not registered: " + handle.message().getClass()); + } + } + + public Envelope dequeue() { + synchronized (MUTEX) { + for (final Queue queue : this.messages) { + if (!queue.isEmpty()) { + final Object m = queue.poll(); + return m instanceof Traverser ? new Envelope(m, ActorRef.noSender()) : (Envelope) m; + } + } + return null; + } + } + + public int numberOfMessages() { + synchronized (MUTEX) { + int counter = 0; + for (final Queue queue : this.messages) { + counter = counter + queue.size(); + } + return counter; + } + } + + public boolean hasMessages() { + synchronized (MUTEX) { + for (final Queue queue : this.messages) { + if (!queue.isEmpty()) + return true; + } + return false; + } + } + + public void cleanUp(final ActorRef owner, final MessageQueue deadLetters) { + synchronized (MUTEX) { + for (final Queue queue : this.messages) { + while (!queue.isEmpty()) { + final Object m = queue.poll(); + deadLetters.enqueue(owner, m instanceof Traverser ? new Envelope(m, ActorRef.noSender()) : (Envelope) m); + } + } + } + } + } + + // This constructor signature must exist, it will be called by Akka + public ActorMailbox(final ActorSystem.Settings settings, final Config config) { + try { + final String[] messages = ((String) settings.config().getAnyRef("message-priorities")).replace("[", "").replace("]", "").split(","); + for (final String clazz : messages) { + this.messagePriorities.add(Class.forName(clazz.trim())); + } + } catch (final ClassNotFoundException e) { + throw new IllegalArgumentException(e.getMessage(), e); + } + } + + // The create method is called to create the MessageQueue + public MessageQueue create(final Option<ActorRef> owner, final Option<ActorSystem> system) { + return new ActorMessageQueue(this.messagePriorities); + } + + public static interface ActorSemantics { + + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java new file mode 100644 index 0000000..3bd5fa6 --- /dev/null +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.actors; + +import akka.actor.ActorSystem; +import akka.actor.Props; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import org.apache.commons.configuration.BaseConfiguration; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationUtils; +import org.apache.tinkerpop.gremlin.process.actors.ActorProgram; +import org.apache.tinkerpop.gremlin.process.actors.ActorsResult; +import org.apache.tinkerpop.gremlin.process.actors.Address; +import org.apache.tinkerpop.gremlin.process.actors.GraphActors; +import org.apache.tinkerpop.gremlin.process.actors.util.DefaultActorsResult; +import org.apache.tinkerpop.gremlin.process.actors.util.GraphActorsHelper; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.Partitioner; +import org.apache.tinkerpop.gremlin.structure.util.StringFactory; +import org.apache.tinkerpop.gremlin.structure.util.partitioner.HashPartitioner; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.stream.Collectors; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class AkkaGraphActors<R> implements GraphActors<R> { + + private ActorProgram actorProgram; + private int workers = 1; + private Configuration configuration; + private boolean executed = false; + + private AkkaGraphActors(final Configuration configuration) { + this.configuration = new BaseConfiguration(); + ConfigurationUtils.copy(configuration, this.configuration); + this.configuration.setProperty(GRAPH_ACTORS, AkkaGraphActors.class.getCanonicalName()); + GraphActorsHelper.configure(this, this.configuration); + } + + @Override + public String toString() { + return StringFactory.graphActorsString(this); + } + + @Override + public GraphActors<R> program(final ActorProgram actorProgram) { + this.actorProgram = actorProgram; + return this; + } + + @Override + public GraphActors<R> workers(final int workers) { + this.workers = workers; + this.configuration.setProperty(GRAPH_ACTORS_WORKERS, workers); + return this; + } + + @Override + public GraphActors<R> configure(final String key, final Object value) { + this.configuration.setProperty(key, value); + return this; + } + + @Override + public Future<R> submit(final Graph graph) { + if (this.executed) + throw new IllegalStateException("Can not execute twice"); + this.executed = true; + final Config config = ConfigFactory.defaultApplication().withValue("message-priorities", + ConfigValueFactory.fromAnyRef(this.actorProgram.getMessagePriorities(). + orElse(Collections.singletonList(Object.class)). + stream(). + map(Class::getCanonicalName). + collect(Collectors.toList()).toString())); + final ActorSystem system = ActorSystem.create("traversal-" + UUID.randomUUID(), config); + final ActorsResult<R> result = new DefaultActorsResult<>(); + final Partitioner partitioner = this.workers == 1 ? graph.partitioner() : new HashPartitioner(graph.partitioner(), this.workers); + try { + new Address.Master(system.actorOf(Props.create(MasterActor.class, this.actorProgram, partitioner, result), "master").path().toString(), InetAddress.getLocalHost()); + } catch (final UnknownHostException e) { + throw new IllegalStateException(e.getMessage(), e); + } + return CompletableFuture.supplyAsync(() -> { + while (!system.isTerminated()) { + + } + return result.getResult(); + }); + } + + @Override + public Configuration configuration() { + return this.configuration; + } + + public static AkkaGraphActors open(final Configuration configuration) { + return new AkkaGraphActors(configuration); + } + + public static AkkaGraphActors open() { + return new AkkaGraphActors(new BaseConfiguration()); + } +} + http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java new file mode 100644 index 0000000..97951a8 --- /dev/null +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.actors; + +import akka.actor.AbstractActor; +import akka.actor.ActorSelection; +import akka.actor.Props; +import akka.dispatch.RequiresMessageQueue; +import akka.japi.pf.ReceiveBuilder; +import org.apache.tinkerpop.gremlin.process.actors.Actor; +import org.apache.tinkerpop.gremlin.process.actors.ActorProgram; +import org.apache.tinkerpop.gremlin.process.actors.ActorsResult; +import org.apache.tinkerpop.gremlin.process.actors.Address; +import org.apache.tinkerpop.gremlin.structure.Partition; +import org.apache.tinkerpop.gremlin.structure.Partitioner; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class MasterActor extends AbstractActor implements RequiresMessageQueue<ActorMailbox.ActorSemantics>, Actor.Master { + + private final ActorProgram.Master masterProgram; + private final Address.Master master; + private final List<Address.Worker> workers; + private final Map<Address, ActorSelection> actors = new HashMap<>(); + private final ActorsResult<?> result; + private final Partitioner partitioner; + + public MasterActor(final ActorProgram program, final Partitioner partitioner, final ActorsResult<?> result) { + this.partitioner = partitioner; + this.result = result; + try { + this.master = new Address.Master(self().path().toString(), InetAddress.getLocalHost()); + } catch (final UnknownHostException e) { + throw new IllegalStateException(e.getMessage(), e); + } + this.workers = new ArrayList<>(); + final List<Partition> partitions = partitioner.getPartitions(); + for (final Partition partition : partitions) { + final String workerPathString = "worker-" + partition.id(); + this.workers.add(new Address.Worker(workerPathString, partition.location())); + context().actorOf(Props.create(WorkerActor.class, program, this.master, partition, partitioner), workerPathString); + } + this.masterProgram = program.createMasterProgram(this); + receive(ReceiveBuilder.matchAny(this.masterProgram::execute).build()); + } + + @Override + public void preStart() { + this.masterProgram.setup(); + } + + @Override + public void postStop() { + this.masterProgram.terminate(); + } + + @Override + public <M> void send(final Address toActor, final M message) { + ActorSelection actor = this.actors.get(toActor); + if (null == actor) { + actor = context().actorSelection(toActor.getId()); + this.actors.put(toActor, actor); + } + actor.tell(message, self()); + } + + @Override + public List<Address.Worker> workers() { + return this.workers; + } + + @Override + public Partitioner partitioner() { + return this.partitioner; + } + + @Override + public Address.Master address() { + return this.master; + } + + @Override + public void close() { + context().system().terminate(); + } + + @Override + public <R> ActorsResult<R> result() { + return (ActorsResult<R>) this.result; + } + +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java new file mode 100644 index 0000000..7520ce4 --- /dev/null +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.actors; + +import akka.actor.AbstractActor; +import akka.actor.ActorSelection; +import akka.dispatch.RequiresMessageQueue; +import akka.japi.pf.ReceiveBuilder; +import org.apache.tinkerpop.gremlin.process.actors.Actor; +import org.apache.tinkerpop.gremlin.process.actors.ActorProgram; +import org.apache.tinkerpop.gremlin.process.actors.Address; +import org.apache.tinkerpop.gremlin.structure.Partition; +import org.apache.tinkerpop.gremlin.structure.Partitioner; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class WorkerActor extends AbstractActor implements RequiresMessageQueue<ActorMailbox.ActorSemantics>, Actor.Worker { + + private final ActorProgram.Worker workerProgram; + private final Partition localPartition; + private final Partitioner partitioner; + private final Address.Worker self; + private final Address.Master master; + private final List<Address.Worker> workers; + private final Map<Address, ActorSelection> actors = new HashMap<>(); + + public WorkerActor(final ActorProgram program, final Address.Master master, final Partition localPartition, final Partitioner partitioner) { + this.localPartition = localPartition; + this.partitioner = partitioner; + this.self = new Address.Worker(this.createWorkerAddress(localPartition), localPartition.location()); + this.master = master; + this.workers = new ArrayList<>(); + for (final Partition partition : partitioner.getPartitions()) { + this.workers.add(new Address.Worker(this.createWorkerAddress(partition), partition.location())); + } + this.workerProgram = program.createWorkerProgram(this); + receive(ReceiveBuilder.matchAny(this.workerProgram::execute).build()); + } + + @Override + public void preStart() { + this.workerProgram.setup(); + } + + @Override + public void postStop() { + this.workerProgram.terminate(); + } + + @Override + public <M> void send(final Address toActor, final M message) { + ActorSelection actor = this.actors.get(toActor); + if (null == actor) { + actor = context().actorSelection(toActor.getId()); + this.actors.put(toActor, actor); + } + actor.tell(message, self()); + } + + @Override + public List<Address.Worker> workers() { + return this.workers; + } + + @Override + public Partition partition() { + return this.localPartition; + } + + @Override + public Partitioner partitioner() { + return this.partitioner; + } + + @Override + public Address.Worker address() { + return this.self; + } + + @Override + public Address.Master master() { + return this.master; + } + + private String createWorkerAddress(final Partition partition) { + return "../worker-" + partition.id(); + } +} + http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/akka-gremlin/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/resources/application.conf b/akka-gremlin/src/main/resources/application.conf index 7ced92c..7ee599a 100644 --- a/akka-gremlin/src/main/resources/application.conf +++ b/akka-gremlin/src/main/resources/application.conf @@ -1,9 +1,9 @@ custom-dispatcher { - mailbox-requirement = "org.apache.tinkerpop.gremlin.akka.process.actor.ActorMailbox$ActorSemantics" + mailbox-requirement = "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox$ActorSemantics" } akka.actor.mailbox.requirements { - "org.apache.tinkerpop.gremlin.akka.process.actor.ActorMailbox$ActorSemantics" = custom-dispatcher-mailbox + "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox$ActorSemantics" = custom-dispatcher-mailbox } akka { @@ -11,5 +11,5 @@ akka { } custom-dispatcher-mailbox { - mailbox-type = "org.apache.tinkerpop.gremlin.akka.process.actor.ActorMailbox" + mailbox-type = "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox" } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProcessActorsTest.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProcessActorsTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProcessActorsTest.java deleted file mode 100644 index 2c1aa57..0000000 --- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProcessActorsTest.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.akka.process; - -import org.apache.tinkerpop.gremlin.GraphProviderClass; -import org.apache.tinkerpop.gremlin.process.ProcessActorsSuite; -import org.apache.tinkerpop.gremlin.process.ProcessStandardSuite; -import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; -import org.junit.runner.RunWith; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -@RunWith(ProcessActorsSuite.class) -@GraphProviderClass(provider = AkkaActorsProvider.class, graph = TinkerGraph.class) -public class AkkaActorsProcessActorsTest { -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java deleted file mode 100644 index 6756e0c..0000000 --- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.akka.process; - -import org.apache.commons.configuration.Configuration; -import org.apache.tinkerpop.gremlin.AbstractGraphProvider; -import org.apache.tinkerpop.gremlin.LoadGraphWith; -import org.apache.tinkerpop.gremlin.akka.process.actor.AkkaGraphActors; -import org.apache.tinkerpop.gremlin.process.actor.GraphActors; -import org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest; -import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; -import org.apache.tinkerpop.gremlin.process.traversal.step.ComplexTest; -import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphTest; -import org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest; -import org.apache.tinkerpop.gremlin.process.traversal.step.map.ProgramTest; -import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.GroupTest; -import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectTest; -import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphTest; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategyProcessTest; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.PartitionStrategyProcessTest; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.SubgraphStrategyProcessTest; -import org.apache.tinkerpop.gremlin.structure.Graph; -import org.apache.tinkerpop.gremlin.structure.VertexProperty; -import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerEdge; -import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerElement; -import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; -import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraphVariables; -import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerProperty; -import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertex; -import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertexProperty; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Random; -import java.util.Set; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public class AkkaActorsProvider extends AbstractGraphProvider { - - private static final Random RANDOM = new Random(); - - private static Set<String> SKIP_TESTS = new HashSet<>(Arrays.asList( - "g_V_hasLabelXpersonX_V_hasLabelXsoftwareX_name", - "g_VX1X_repeatXbothEXcreatedX_whereXwithoutXeXX_aggregateXeX_otherVX_emit_path", - "g_withBulkXfalseX_withSackX1_sumX_V_out_barrier_sack", - "g_V_both_groupCountXaX_out_capXaX_selectXkeysX_unfold_both_groupCountXaX_capXaX", - GraphTest.Traversals.class.getCanonicalName(), - GroupTest.Traversals.class.getCanonicalName(), - ComplexTest.Traversals.class.getCanonicalName(), - SubgraphTest.Traversals.class.getCanonicalName(), - SideEffectTest.Traversals.class.getCanonicalName(), - SubgraphStrategyProcessTest.class.getCanonicalName(), - ProfileTest.Traversals.class.getCanonicalName(), - PartitionStrategyProcessTest.class.getCanonicalName(), - EventStrategyProcessTest.class.getCanonicalName(), - ElementIdStrategyProcessTest.class.getCanonicalName(), - TraversalInterruptionTest.class.getCanonicalName(), - ProgramTest.Traversals.class.getCanonicalName())); - - private static final Set<Class> IMPLEMENTATION = new HashSet<Class>() {{ - add(TinkerEdge.class); - add(TinkerElement.class); - add(TinkerGraph.class); - add(TinkerGraphVariables.class); - add(TinkerProperty.class); - add(TinkerVertex.class); - add(TinkerVertexProperty.class); - }}; - - @Override - public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, - final LoadGraphWith.GraphData loadGraphWith) { - - final TinkerGraph.DefaultIdManager idManager = selectIdMakerFromGraphData(loadGraphWith); - final String idMaker = (idManager.equals(TinkerGraph.DefaultIdManager.ANY) ? selectIdMakerFromGraphData(loadGraphWith) : idManager).name(); - return new HashMap<String, Object>() {{ - put(Graph.GRAPH, TinkerGraph.class.getName()); - put(TinkerGraph.GREMLIN_TINKERGRAPH_VERTEX_ID_MANAGER, idMaker); - put(TinkerGraph.GREMLIN_TINKERGRAPH_EDGE_ID_MANAGER, idMaker); - put(TinkerGraph.GREMLIN_TINKERGRAPH_VERTEX_PROPERTY_ID_MANAGER, idMaker); - put("skipTest", SKIP_TESTS.contains(testMethodName) || SKIP_TESTS.contains(test.getCanonicalName())); - if (loadGraphWith == LoadGraphWith.GraphData.CREW) - put(TinkerGraph.GREMLIN_TINKERGRAPH_DEFAULT_VERTEX_PROPERTY_CARDINALITY, VertexProperty.Cardinality.list.name()); - }}; - } - - @Override - public void clear(final Graph graph, final Configuration configuration) throws Exception { - if (graph != null) graph.close(); - } - - @Override - public Set<Class> getImplementations() { - return IMPLEMENTATION; - } - - /** - * Test that load with specific graph data can be configured with a specific id manager as the data type to - * be used in the test for that graph is known. - */ - protected TinkerGraph.DefaultIdManager selectIdMakerFromGraphData(final LoadGraphWith.GraphData loadGraphWith) { - if (null == loadGraphWith) return TinkerGraph.DefaultIdManager.ANY; - if (loadGraphWith.equals(LoadGraphWith.GraphData.CLASSIC)) - return TinkerGraph.DefaultIdManager.INTEGER; - else if (loadGraphWith.equals(LoadGraphWith.GraphData.MODERN)) - return TinkerGraph.DefaultIdManager.INTEGER; - else if (loadGraphWith.equals(LoadGraphWith.GraphData.CREW)) - return TinkerGraph.DefaultIdManager.INTEGER; - else if (loadGraphWith.equals(LoadGraphWith.GraphData.GRATEFUL)) - return TinkerGraph.DefaultIdManager.INTEGER; - else - throw new IllegalStateException(String.format("Need to define a new %s for %s", TinkerGraph.IdManager.class.getName(), loadGraphWith.name())); - } - -///////////////////////////// -///////////////////////////// -///////////////////////////// - - @Override - public GraphTraversalSource traversal(final Graph graph) { - if ((Boolean) graph.configuration().getProperty("skipTest")) - return graph.traversal(); - //throw new VerificationException("This test current does not work with Gremlin-Python", EmptyTraversal.instance()); - else { - final GraphTraversalSource g = graph.traversal(); - return RANDOM.nextBoolean() ? - g.withProcessor(AkkaGraphActors.open().workers(new Random().nextInt(15) + 1)) : - g.withProcessor(GraphActors.open(AkkaGraphActors.class)); - } - } - - @Override - public GraphActors getGraphActors(final Graph graph) { - return AkkaGraphActors.open().workers(new Random().nextInt(15) + 1); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java deleted file mode 100644 index df40748..0000000 --- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.akka.process; - -import org.apache.tinkerpop.gremlin.akka.process.actor.AkkaGraphActors; -import org.apache.tinkerpop.gremlin.process.actor.GraphActors; -import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; -import org.apache.tinkerpop.gremlin.structure.Graph; -import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo; -import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; -import org.junit.Ignore; -import org.junit.Test; - -import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.in; -import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public class AkkaPlayTest { - - @Test - @Ignore - public void testPlay1() throws Exception { - final Graph graph = TinkerGraph.open(); - graph.io(GryoIo.build()).readGraph("../data/tinkerpop-modern.kryo"); - GraphTraversalSource g = graph.traversal().withProcessor(GraphActors.open(AkkaGraphActors.class).workers(3)); - // System.out.println(g.V().group().by("name").by(outE().values("weight").fold()).toList()); - - for (int i = 0; i < 1000; i++) { - if (12l != g.V().union(out(), in()).values("name").count().next()) - System.out.println(i); - } - - //3, 1.9, 1 - /*for (int i = 0; i < 10000; i++) { - final Graph graph = TinkerGraph.open(); - graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo"); - final GraphTraversalSource g = graph.traversal().withComputer(); - final List<Pair<Integer, Traversal.Admin<?, ?>>> traversals = Arrays.asList( - // match() works - Pair.with(6, g.V().match( - as("a").out("created").as("b"), - as("b").in("created").as("c"), - as("b").has("name", P.eq("lop"))).where("a", P.neq("c")).select("a", "b", "c").by("name").asAdmin()), - // side-effects work - Pair.with(3, g.V().repeat(both()).times(2). - groupCount("a").by("name"). - cap("a").unfold().order().by(Column.values, Order.decr).limit(3).asAdmin()), - // barriers work and beyond the local star graph works - Pair.with(1, g.V().repeat(both()).times(2).hasLabel("person"). - group(). - by("name"). - by(out("created").values("name").dedup().fold()).asAdmin()), - // no results works - Pair.with(0, g.V().out("blah").asAdmin()) - ); - for (final Pair<Integer,Traversal.Admin<?, ?>> pair : traversals) { - final Integer count = pair.getValue0(); - final Traversal.Admin<?,?> traversal = pair.getValue1(); - System.out.println("EXECUTING: " + traversal.getBytecode()); - final TinkerActorSystem<?,?> actors = new TinkerActorSystem<>(traversal.clone(),new HashPartitioner(graph.partitioner(), 3)); - System.out.println(IteratorUtils.asList(actors.getResults().get())); - if(IteratorUtils.count(actors.getResults().get()) != count) - throw new IllegalStateException(); - System.out.println("//////////////////////////////////\n"); - } - } - }*/ - - } -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProcessActorsTest.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProcessActorsTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProcessActorsTest.java new file mode 100644 index 0000000..e0feef0 --- /dev/null +++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProcessActorsTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.actors; + +import org.apache.tinkerpop.gremlin.GraphProviderClass; +import org.apache.tinkerpop.gremlin.process.ProcessActorsSuite; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; +import org.junit.runner.RunWith; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +@RunWith(ProcessActorsSuite.class) +@GraphProviderClass(provider = AkkaActorsProvider.class, graph = TinkerGraph.class) +public class AkkaActorsProcessActorsTest { +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java new file mode 100644 index 0000000..4168445 --- /dev/null +++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.actors; + +import org.apache.commons.configuration.Configuration; +import org.apache.tinkerpop.gremlin.AbstractGraphProvider; +import org.apache.tinkerpop.gremlin.LoadGraphWith; +import org.apache.tinkerpop.gremlin.akka.process.actors.AkkaGraphActors; +import org.apache.tinkerpop.gremlin.process.actors.GraphActors; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.apache.tinkerpop.gremlin.process.traversal.step.ComplexTest; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphTest; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.ProgramTest; +import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.GroupTest; +import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectTest; +import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphTest; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategyProcessTest; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.PartitionStrategyProcessTest; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.SubgraphStrategyProcessTest; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.VertexProperty; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerEdge; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerElement; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraphVariables; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerProperty; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertex; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertexProperty; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class AkkaActorsProvider extends AbstractGraphProvider { + + private static final Random RANDOM = new Random(); + + private static Set<String> SKIP_TESTS = new HashSet<>(Arrays.asList( + "g_V_hasLabelXpersonX_V_hasLabelXsoftwareX_name", + "g_VX1X_repeatXbothEXcreatedX_whereXwithoutXeXX_aggregateXeX_otherVX_emit_path", + "g_withBulkXfalseX_withSackX1_sumX_V_out_barrier_sack", + "g_V_both_groupCountXaX_out_capXaX_selectXkeysX_unfold_both_groupCountXaX_capXaX", + GraphTest.Traversals.class.getCanonicalName(), + GroupTest.Traversals.class.getCanonicalName(), + ComplexTest.Traversals.class.getCanonicalName(), + SubgraphTest.Traversals.class.getCanonicalName(), + SideEffectTest.Traversals.class.getCanonicalName(), + SubgraphStrategyProcessTest.class.getCanonicalName(), + ProfileTest.Traversals.class.getCanonicalName(), + PartitionStrategyProcessTest.class.getCanonicalName(), + EventStrategyProcessTest.class.getCanonicalName(), + ElementIdStrategyProcessTest.class.getCanonicalName(), + TraversalInterruptionTest.class.getCanonicalName(), + ProgramTest.Traversals.class.getCanonicalName())); + + private static final Set<Class> IMPLEMENTATION = new HashSet<Class>() {{ + add(TinkerEdge.class); + add(TinkerElement.class); + add(TinkerGraph.class); + add(TinkerGraphVariables.class); + add(TinkerProperty.class); + add(TinkerVertex.class); + add(TinkerVertexProperty.class); + }}; + + @Override + public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, + final LoadGraphWith.GraphData loadGraphWith) { + + final TinkerGraph.DefaultIdManager idManager = selectIdMakerFromGraphData(loadGraphWith); + final String idMaker = (idManager.equals(TinkerGraph.DefaultIdManager.ANY) ? selectIdMakerFromGraphData(loadGraphWith) : idManager).name(); + return new HashMap<String, Object>() {{ + put(Graph.GRAPH, TinkerGraph.class.getName()); + put(TinkerGraph.GREMLIN_TINKERGRAPH_VERTEX_ID_MANAGER, idMaker); + put(TinkerGraph.GREMLIN_TINKERGRAPH_EDGE_ID_MANAGER, idMaker); + put(TinkerGraph.GREMLIN_TINKERGRAPH_VERTEX_PROPERTY_ID_MANAGER, idMaker); + put("skipTest", SKIP_TESTS.contains(testMethodName) || SKIP_TESTS.contains(test.getCanonicalName())); + if (loadGraphWith == LoadGraphWith.GraphData.CREW) + put(TinkerGraph.GREMLIN_TINKERGRAPH_DEFAULT_VERTEX_PROPERTY_CARDINALITY, VertexProperty.Cardinality.list.name()); + }}; + } + + @Override + public void clear(final Graph graph, final Configuration configuration) throws Exception { + if (graph != null) graph.close(); + } + + @Override + public Set<Class> getImplementations() { + return IMPLEMENTATION; + } + + /** + * Test that load with specific graph data can be configured with a specific id manager as the data type to + * be used in the test for that graph is known. + */ + protected TinkerGraph.DefaultIdManager selectIdMakerFromGraphData(final LoadGraphWith.GraphData loadGraphWith) { + if (null == loadGraphWith) return TinkerGraph.DefaultIdManager.ANY; + if (loadGraphWith.equals(LoadGraphWith.GraphData.CLASSIC)) + return TinkerGraph.DefaultIdManager.INTEGER; + else if (loadGraphWith.equals(LoadGraphWith.GraphData.MODERN)) + return TinkerGraph.DefaultIdManager.INTEGER; + else if (loadGraphWith.equals(LoadGraphWith.GraphData.CREW)) + return TinkerGraph.DefaultIdManager.INTEGER; + else if (loadGraphWith.equals(LoadGraphWith.GraphData.GRATEFUL)) + return TinkerGraph.DefaultIdManager.INTEGER; + else + throw new IllegalStateException(String.format("Need to define a new %s for %s", TinkerGraph.IdManager.class.getName(), loadGraphWith.name())); + } + +///////////////////////////// +///////////////////////////// +///////////////////////////// + + @Override + public GraphTraversalSource traversal(final Graph graph) { + if ((Boolean) graph.configuration().getProperty("skipTest")) + return graph.traversal(); + //throw new VerificationException("This test current does not work with Gremlin-Python", EmptyTraversal.instance()); + else { + final GraphTraversalSource g = graph.traversal(); + return RANDOM.nextBoolean() ? + g.withProcessor(AkkaGraphActors.open().workers(new Random().nextInt(15) + 1)) : + g.withProcessor(GraphActors.open(AkkaGraphActors.class)); + } + } + + @Override + public GraphActors getGraphActors(final Graph graph) { + return AkkaGraphActors.open().workers(new Random().nextInt(15) + 1); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java new file mode 100644 index 0000000..d4562eb --- /dev/null +++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.actors; + +import org.apache.tinkerpop.gremlin.akka.process.actors.AkkaGraphActors; +import org.apache.tinkerpop.gremlin.process.actors.GraphActors; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; +import org.junit.Ignore; +import org.junit.Test; + +import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.in; +import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class AkkaPlayTest { + + @Test + @Ignore + public void testPlay1() throws Exception { + final Graph graph = TinkerGraph.open(); + graph.io(GryoIo.build()).readGraph("../data/tinkerpop-modern.kryo"); + GraphTraversalSource g = graph.traversal().withProcessor(GraphActors.open(AkkaGraphActors.class).workers(3)); + // System.out.println(g.V().group().by("name").by(outE().values("weight").fold()).toList()); + + for (int i = 0; i < 1000; i++) { + if (12l != g.V().union(out(), in()).values("name").count().next()) + System.out.println(i); + } + + //3, 1.9, 1 + /*for (int i = 0; i < 10000; i++) { + final Graph graph = TinkerGraph.open(); + graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo"); + final GraphTraversalSource g = graph.traversal().withComputer(); + final List<Pair<Integer, Traversal.Admin<?, ?>>> traversals = Arrays.asList( + // match() works + Pair.with(6, g.V().match( + as("a").out("created").as("b"), + as("b").in("created").as("c"), + as("b").has("name", P.eq("lop"))).where("a", P.neq("c")).select("a", "b", "c").by("name").asAdmin()), + // side-effects work + Pair.with(3, g.V().repeat(both()).times(2). + groupCount("a").by("name"). + cap("a").unfold().order().by(Column.values, Order.decr).limit(3).asAdmin()), + // barriers work and beyond the local star graph works + Pair.with(1, g.V().repeat(both()).times(2).hasLabel("person"). + group(). + by("name"). + by(out("created").values("name").dedup().fold()).asAdmin()), + // no results works + Pair.with(0, g.V().out("blah").asAdmin()) + ); + for (final Pair<Integer,Traversal.Admin<?, ?>> pair : traversals) { + final Integer count = pair.getValue0(); + final Traversal.Admin<?,?> traversal = pair.getValue1(); + System.out.println("EXECUTING: " + traversal.getBytecode()); + final TinkerActorSystem<?,?> actors = new TinkerActorSystem<>(traversal.clone(),new HashPartitioner(graph.partitioner(), 3)); + System.out.println(IteratorUtils.asList(actors.getResults().get())); + if(IteratorUtils.count(actors.getResults().get()) != count) + throw new IllegalStateException(); + System.out.println("//////////////////////////////////\n"); + } + } + }*/ + + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java deleted file mode 100644 index 5a0b869..0000000 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.actor; - -import org.apache.tinkerpop.gremlin.structure.Partition; -import org.apache.tinkerpop.gremlin.structure.Partitioner; - -import java.util.List; - -/** - * An Actor represents an isolated processing unit that can only be interacted with via messages. - * Actors are able to send and receive messages. The {@link GraphActors} framework has two types of actors: - * {@link Master} and {@link Worker}. A master actor is not associated with a particular graph {@link Partition}. - * Instead, its role is to coordinate the workers and ultimately, yield the final result of the submitted - * {@link ActorProgram}. - * - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public interface Actor { - - /** - * Get the {@link Partitioner} associated with the {@link GraphActors} system. - * - * @return the partitioner used to partition (logically and/or physically) the {@link org.apache.tinkerpop.gremlin.structure.Graph} - */ - public Partitioner partitioner(); - - /** - * Get the {@link Address} of the actor. - * - * @return the actor's address - */ - public Address address(); - - /** - * Get a list of the {@link Address} values of all the workers in {@link GraphActors} system. - * - * @return the worker's addresses - */ - public List<Address.Worker> workers(); - - /** - * Send a message from this actor to another actor given their {@link Address}. - * - * @param toActor the actor to receive the messages - * @param message the message being sent - * @param <M> the message type - */ - public <M> void send(final Address toActor, final M message); - - public interface Master extends Actor { - - public Address.Master address(); - - public void close(); - - public <R> ActorsResult<R> result(); - - } - - public interface Worker extends Actor { - - public Address.Worker address(); - - public Address.Master master(); - - /** - * Get the {@link Partition} associated with this worker. - * In principle, this is the subset of the {@link org.apache.tinkerpop.gremlin.structure.Graph} that - * the worker is "data-local" to. - * - * @return the worker's partition - */ - public Partition partition(); - } - - -}