what a day. So I have Akka remoting working (not fully). We now have akka.io.GryoSerializer :). And TraversalActorProgram is smart about detach() and attach(). Learned a bunch about how to do Partitioner/Partitions in TinkerPop.... stuff is coming along nicely. Need a break though...been coding for 4 hours straight.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/0e7b6ae1 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/0e7b6ae1 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/0e7b6ae1 Branch: refs/heads/TINKERPOP-1564 Commit: 0e7b6ae16c58f61b823a4cc02073a141d84b97bd Parents: 63f5c0f Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Wed Jan 11 14:38:13 2017 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Thu Jan 19 10:27:16 2017 -0700 ---------------------------------------------------------------------- akka-gremlin/pom.xml | 11 +++ .../akka/process/actors/AkkaGraphActors.java | 8 +- .../akka/process/actors/MasterActor.java | 6 +- .../akka/process/actors/io/GryoSerializer.java | 90 ++++++++++++++++++++ .../src/main/resources/application.conf | 40 +++++++-- .../akka/process/actors/AkkaPlayTest.java | 11 +-- .../gremlin/process/actors/ActorProgram.java | 14 +++ .../actors/traversal/TraversalActorProgram.java | 6 +- .../traversal/TraversalMasterProgram.java | 15 +++- .../traversal/TraversalWorkerProgram.java | 37 +++++--- .../traversal/message/BarrierAddMessage.java | 8 +- .../traversal/message/SideEffectAddMessage.java | 8 +- .../tinkerpop/gremlin/structure/Partition.java | 4 +- .../gremlin/structure/util/Attachable.java | 37 +++++++- .../util/partitioner/GlobalPartitioner.java | 13 ++- .../util/config/SerializableConfiguration.java | 77 +++++++++++++++++ 16 files changed, 347 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/akka-gremlin/pom.xml ---------------------------------------------------------------------- diff --git a/akka-gremlin/pom.xml b/akka-gremlin/pom.xml index f88ec7d..daebdfe 100644 --- a/akka-gremlin/pom.xml +++ b/akka-gremlin/pom.xml @@ -47,6 +47,17 @@ </exclusions> </dependency> <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-remote_2.11</artifactId> + <version>2.4.14</version> + <exclusions> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java index 3bd5fa6..acc06ff 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java @@ -37,11 +37,11 @@ import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Partitioner; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; import org.apache.tinkerpop.gremlin.structure.util.partitioner.HashPartitioner; +import org.apache.tinkerpop.gremlin.util.config.SerializableConfiguration; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collections; -import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.stream.Collectors; @@ -57,8 +57,7 @@ public final class AkkaGraphActors<R> implements GraphActors<R> { private boolean executed = false; private AkkaGraphActors(final Configuration configuration) { - this.configuration = new BaseConfiguration(); - ConfigurationUtils.copy(configuration, this.configuration); + this.configuration = new SerializableConfiguration(configuration); this.configuration.setProperty(GRAPH_ACTORS, AkkaGraphActors.class.getCanonicalName()); GraphActorsHelper.configure(this, this.configuration); } @@ -98,7 +97,7 @@ public final class AkkaGraphActors<R> implements GraphActors<R> { stream(). map(Class::getCanonicalName). collect(Collectors.toList()).toString())); - final ActorSystem system = ActorSystem.create("traversal-" + UUID.randomUUID(), config); + final ActorSystem system = ActorSystem.create("traversal", config); final ActorsResult<R> result = new DefaultActorsResult<>(); final Partitioner partitioner = this.workers == 1 ? graph.partitioner() : new HashPartitioner(graph.partitioner(), this.workers); try { @@ -126,5 +125,6 @@ public final class AkkaGraphActors<R> implements GraphActors<R> { public static AkkaGraphActors open() { return new AkkaGraphActors(new BaseConfiguration()); } + } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java index 97951a8..b9c30bf 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java @@ -21,9 +21,12 @@ package org.apache.tinkerpop.gremlin.akka.process.actors; import akka.actor.AbstractActor; import akka.actor.ActorSelection; +import akka.actor.AddressFromURIString; +import akka.actor.Deploy; import akka.actor.Props; import akka.dispatch.RequiresMessageQueue; import akka.japi.pf.ReceiveBuilder; +import akka.remote.RemoteScope; import org.apache.tinkerpop.gremlin.process.actors.Actor; import org.apache.tinkerpop.gremlin.process.actors.ActorProgram; import org.apache.tinkerpop.gremlin.process.actors.ActorsResult; @@ -61,9 +64,10 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ this.workers = new ArrayList<>(); final List<Partition> partitions = partitioner.getPartitions(); for (final Partition partition : partitions) { + akka.actor.Address addr = AddressFromURIString.parse("akka.tcp://traversal@127.0.0.1:2552"); final String workerPathString = "worker-" + partition.id(); this.workers.add(new Address.Worker(workerPathString, partition.location())); - context().actorOf(Props.create(WorkerActor.class, program, this.master, partition, partitioner), workerPathString); + context().actorOf(Props.create(WorkerActor.class, program, this.master, partition, partitioner).withDeploy(new Deploy(new RemoteScope(addr))), workerPathString); } this.masterProgram = program.createMasterProgram(this); receive(ReceiveBuilder.matchAny(this.masterProgram::execute).build()); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java new file mode 100644 index 0000000..ab2b16a --- /dev/null +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.actors.io; + +import akka.serialization.Serializer; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddMessage; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectAddMessage; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.StartMessage; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool; +import org.apache.tinkerpop.shaded.kryo.io.Input; +import org.apache.tinkerpop.shaded.kryo.io.Output; +import scala.Option; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class GryoSerializer implements Serializer { + + private final GryoPool gryoPool; + + public GryoSerializer() { + this.gryoPool = GryoPool.build(). + poolSize(100). + initializeMapper(builder -> + builder.referenceTracking(true). + registrationRequired(true). + addCustom( + StartMessage.class, + BarrierAddMessage.class, + SideEffectAddMessage.class)).create(); + } + + @Override + public int identifier() { + return 0; + } + + @Override + public byte[] toBinary(final Object object) { + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + final Output output = new Output(outputStream); + this.gryoPool.writeWithKryo(kryo -> kryo.writeClassAndObject(output, object)); + output.flush(); + return outputStream.toByteArray(); + } + + @Override + public boolean includeManifest() { + return true; + } + + @Override + public Object fromBinary(byte[] bytes, Option<Class<?>> option) { + return option.isEmpty() ? this.fromBinary(bytes) : this.fromBinary(bytes, option.get()); + } + + @Override + public Object fromBinary(byte[] bytes) { + final ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); + final Input input = new Input(inputStream); + return this.gryoPool.readWithKryo(kryo -> kryo.readClassAndObject(input)); + } + + @Override + public Object fromBinary(byte[] bytes, Class<?> aClass) { + final ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); + final Input input = new Input(inputStream); + return this.gryoPool.readWithKryo(kryo -> kryo.readClassAndObject(input)); // todo: be smart about just reading object + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/akka-gremlin/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/resources/application.conf b/akka-gremlin/src/main/resources/application.conf index 7ee599a..393881a 100644 --- a/akka-gremlin/src/main/resources/application.conf +++ b/akka-gremlin/src/main/resources/application.conf @@ -1,15 +1,43 @@ +akka { + log-dead-letters-during-shutdown = "false" +} + +custom-dispatcher-mailbox { + mailbox-type = "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox" +} + custom-dispatcher { mailbox-requirement = "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox$ActorSemantics" } -akka.actor.mailbox.requirements { - "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox$ActorSemantics" = custom-dispatcher-mailbox +akka.actor { + provider = local + serialize-messages = off + serializers { + gryo = "org.apache.tinkerpop.gremlin.akka.process.actors.io.GryoSerializer" + } + serialization-bindings { + "org.apache.tinkerpop.gremlin.process.actors.traversal.message.StartMessage" = gryo + "org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddMessage" = gryo + "org.apache.tinkerpop.gremlin.process.actors.traversal.TraversalActorProgram" = gryo + } + mailbox.requirements { + "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox$ActorSemantics" = custom-dispatcher-mailbox + } } -akka { - log-dead-letters-during-shutdown = "false" +akka.remote { + enabled-transports = ["akka.remote.netty.tcp"] + netty.tcp { + hostname = "127.0.0.1" + port = 2552 + } } -custom-dispatcher-mailbox { - mailbox-type = "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox" +akka.cluster { + seed-nodes = [ + "akka.tcp://traversal@127.0.0.1:2551", + "akka.tcp://traversal@127.0.0.1:2552"] + + auto-down-unreachable-after = 10s } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java index d4562eb..c95f336 100644 --- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java +++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java @@ -23,6 +23,7 @@ import org.apache.tinkerpop.gremlin.akka.process.actors.AkkaGraphActors; import org.apache.tinkerpop.gremlin.process.actors.GraphActors; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.T; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo; import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; import org.junit.Ignore; @@ -30,6 +31,7 @@ import org.junit.Test; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.in; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out; +import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.outE; /** * @author Marko A. Rodriguez (http://markorodriguez.com) @@ -42,12 +44,11 @@ public class AkkaPlayTest { final Graph graph = TinkerGraph.open(); graph.io(GryoIo.build()).readGraph("../data/tinkerpop-modern.kryo"); GraphTraversalSource g = graph.traversal().withProcessor(GraphActors.open(AkkaGraphActors.class).workers(3)); - // System.out.println(g.V().group().by("name").by(outE().values("weight").fold()).toList()); + // System.out.println(g.V().group().by("name").by(outE().values("weight").fold()).toList()); + + System.out.println(g.V().groupCount().by(T.label).toList()); + - for (int i = 0; i < 1000; i++) { - if (12l != g.V().union(out(), in()).values("name").count().next()) - System.out.println(i); - } //3, 1.9, 1 /*for (int i = 0; i < 10000; i++) { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java index b1e3065..e3713ad 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java @@ -22,6 +22,7 @@ package org.apache.tinkerpop.gremlin.process.actors; import org.apache.commons.configuration.Configuration; import org.apache.tinkerpop.gremlin.structure.Graph; +import java.lang.reflect.Constructor; import java.util.List; import java.util.Optional; @@ -99,6 +100,19 @@ public interface ActorProgram extends Cloneable { @SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException") public ActorProgram clone(); + public static <A extends ActorProgram> A createActorProgram(final Graph graph, final Configuration configuration) { + try { + final Class<A> actorProgramClass = (Class) Class.forName(configuration.getString(ACTOR_PROGRAM)); + final Constructor<A> constructor = actorProgramClass.getDeclaredConstructor(); + constructor.setAccessible(true); + final A actorProgram = constructor.newInstance(); + actorProgram.loadState(graph, configuration); + return actorProgram; + } catch (final Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + /** * The Worker program is executed by a worker process in the {@link GraphActors} system. * There are many workers and a single master. http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java index 484b904..c97ffd7 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java @@ -44,7 +44,9 @@ import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.Repe import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.util.config.SerializableConfiguration; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -53,7 +55,7 @@ import java.util.Optional; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class TraversalActorProgram<R> implements ActorProgram { +public final class TraversalActorProgram<R> implements ActorProgram, Serializable { public static final String TRAVERSAL_ACTOR_PROGRAM_BYTECODE = "gremlin.traversalActorProgram.bytecode"; @@ -68,9 +70,11 @@ public final class TraversalActorProgram<R> implements ActorProgram { private Traversal.Admin<?, R> traversal; public TraverserSet<R> result = new TraverserSet<>(); + private Configuration configuration; public TraversalActorProgram(final Traversal.Admin<?, R> traversal) { this.traversal = traversal; + this.configuration = new SerializableConfiguration(configuration); final TraversalStrategies strategies = this.traversal.getStrategies().clone(); strategies.addStrategies(ActorVerificationStrategy.instance(), ReadOnlyStrategy.instance()); // TODO: make TinkerGraph/etc. strategies smart about actors http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java index e447cdb..796e4c1 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java @@ -44,6 +44,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSe import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix; import org.apache.tinkerpop.gremlin.structure.Element; import org.apache.tinkerpop.gremlin.structure.Partition; +import org.apache.tinkerpop.gremlin.structure.util.Attachable; import java.util.HashMap; import java.util.Map; @@ -144,6 +145,7 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object> { } private void processTraverser(final Traverser.Admin traverser) { + this.attachTraverser(traverser); if (traverser.isHalted() || traverser.get() instanceof Element) { this.sendTraverser(traverser); } else { @@ -163,9 +165,9 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object> { if (traverser.isHalted()) this.results.add(traverser); else if (traverser.get() instanceof Element) - this.master.send(this.partitionToWorkerMap.get(this.master.partitioner().getPartition((Element) traverser.get())), traverser); + this.master.send(this.partitionToWorkerMap.get(this.master.partitioner().getPartition((Element) traverser.get())), this.detachTraverser(traverser)); else - this.master.send(this.master.address(), traverser); + this.master.send(this.master.address(), this.detachTraverser(traverser)); } private void orderBarrier(final Step step) { @@ -176,4 +178,13 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object> { barrier.addBarrier(rangingBarrier); } } + + private final Traverser.Admin detachTraverser(final Traverser.Admin traverser) { + return true ? traverser : traverser.detach(); + } + + private void attachTraverser(final Traverser.Admin traverser) { + if (false && traverser.get() instanceof Element) + traverser.attach(Attachable.Method.get(this.master.partitioner().getPartition((Element) traverser.get()))); + } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java index 127322f..fa5645d 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java @@ -40,6 +40,7 @@ import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Element; import org.apache.tinkerpop.gremlin.structure.Partition; import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.util.Attachable; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import java.util.HashMap; @@ -99,10 +100,10 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> { //System.out.println(message + "::" + this.isLeader); if (message instanceof StartMessage) { // initial message from master that says: "start processing" - final GraphStep<?,?> step = (GraphStep) this.matrix.getTraversal().getStartStep(); + final GraphStep<?, ?> step = (GraphStep) this.matrix.getTraversal().getStartStep(); while (step.hasNext()) { final Traverser.Admin<? extends Element> traverser = step.next(); - this.self.send(traverser.isHalted() ? this.self.master() : this.self.address(), traverser); + this.self.send(traverser.isHalted() ? this.self.master() : this.self.address(), this.detachTraverser(traverser)); } } else if (message instanceof Traverser.Admin) { this.processTraverser((Traverser.Admin) message); @@ -147,20 +148,26 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> { ////////////// private void processTraverser(final Traverser.Admin traverser) { - assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.self.partition().contains((Element) traverser.get()); - final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId()); - step.addStart(traverser); - if (step instanceof Barrier) { - this.barriers.put(step.getId(), (Barrier) step); - } else { - while (step.hasNext()) { - this.sendTraverser(step.next()); + assert !(traverser.get() instanceof Element) || this.self.partition().contains((Element) traverser.get()); + if (traverser.isHalted()) + this.sendTraverser(traverser); + else { + this.attachTraverser(traverser); + final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId()); + step.addStart(traverser); + if (step instanceof Barrier) { + this.barriers.put(step.getId(), (Barrier) step); + } else { + while (step.hasNext()) { + this.sendTraverser(step.next()); + } } } } private void sendTraverser(final Traverser.Admin traverser) { this.voteToHalt = false; + this.detachTraverser(traverser); if (traverser.isHalted()) this.self.send(this.self.master(), traverser); else if (traverser.get() instanceof Element && !this.self.partition().contains((Element) traverser.get())) @@ -168,4 +175,14 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> { else this.self.send(this.self.address(), traverser); } + + private final Traverser.Admin detachTraverser(final Traverser.Admin traverser) { + return true ? traverser : traverser.detach(); + } + + private final Traverser.Admin attachTraverser(final Traverser.Admin traverser) { + if (false) + traverser.attach(Attachable.Method.get(this.self.partition())); + return traverser; + } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java index ac4c61d..ade6796 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java @@ -27,8 +27,12 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; */ public final class BarrierAddMessage { - private final Object barrier; - private final String stepId; + private Object barrier; + private String stepId; + + private BarrierAddMessage() { + // for serialization + } public BarrierAddMessage(final Barrier barrier) { this.barrier = barrier.nextBarrier(); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java index 1c0a9de..bcc3223 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java @@ -24,8 +24,12 @@ package org.apache.tinkerpop.gremlin.process.actors.traversal.message; */ public final class SideEffectAddMessage { - private final String key; - private final Object value; + private String key; + private Object value; + + private SideEffectAddMessage() { + // for serialization + } public SideEffectAddMessage(final String key, final Object value) { this.value = value; http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java index f20b9fb..49389f1 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java @@ -19,6 +19,8 @@ package org.apache.tinkerpop.gremlin.structure; +import org.apache.tinkerpop.gremlin.structure.util.Host; + import java.net.InetAddress; import java.net.URI; import java.util.Iterator; @@ -32,7 +34,7 @@ import java.util.UUID; * * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public interface Partition { +public interface Partition extends Host { /** * Whether or not this element was, is, or will be contained in this partition. http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/Attachable.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/Attachable.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/Attachable.java index fa999aa..f748ee6 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/Attachable.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/Attachable.java @@ -22,6 +22,7 @@ import org.apache.tinkerpop.gremlin.structure.Direction; import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Element; import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.Partition; import org.apache.tinkerpop.gremlin.structure.Property; import org.apache.tinkerpop.gremlin.structure.T; import org.apache.tinkerpop.gremlin.structure.Vertex; @@ -75,21 +76,27 @@ public interface Attachable<V> { if (base instanceof Vertex) { final Optional<Vertex> optional = hostVertexOrGraph instanceof Graph ? Method.getVertex((Attachable<Vertex>) attachable, (Graph) hostVertexOrGraph) : - Method.getVertex((Attachable<Vertex>) attachable, (Vertex) hostVertexOrGraph); + hostVertexOrGraph instanceof Vertex ? + Method.getVertex((Attachable<Vertex>) attachable, (Vertex) hostVertexOrGraph) : + Method.getVertex((Attachable<Vertex>) attachable, (Partition) hostVertexOrGraph); return (V) optional.orElseThrow(() -> hostVertexOrGraph instanceof Graph ? Attachable.Exceptions.canNotGetAttachableFromHostGraph(attachable, (Graph) hostVertexOrGraph) : Attachable.Exceptions.canNotGetAttachableFromHostVertex(attachable, (Vertex) hostVertexOrGraph)); } else if (base instanceof Edge) { final Optional<Edge> optional = hostVertexOrGraph instanceof Graph ? Method.getEdge((Attachable<Edge>) attachable, (Graph) hostVertexOrGraph) : - Method.getEdge((Attachable<Edge>) attachable, (Vertex) hostVertexOrGraph); + hostVertexOrGraph instanceof Vertex ? + Method.getEdge((Attachable<Edge>) attachable, (Vertex) hostVertexOrGraph) : + Method.getEdge((Attachable<Edge>) attachable, (Partition) hostVertexOrGraph); return (V) optional.orElseThrow(() -> hostVertexOrGraph instanceof Graph ? Attachable.Exceptions.canNotGetAttachableFromHostGraph(attachable, (Graph) hostVertexOrGraph) : Attachable.Exceptions.canNotGetAttachableFromHostVertex(attachable, (Vertex) hostVertexOrGraph)); } else if (base instanceof VertexProperty) { final Optional<VertexProperty> optional = hostVertexOrGraph instanceof Graph ? Method.getVertexProperty((Attachable<VertexProperty>) attachable, (Graph) hostVertexOrGraph) : - Method.getVertexProperty((Attachable<VertexProperty>) attachable, (Vertex) hostVertexOrGraph); + hostVertexOrGraph instanceof Vertex ? + Method.getVertexProperty((Attachable<VertexProperty>) attachable, (Vertex) hostVertexOrGraph) : + Method.getVertexProperty((Attachable<VertexProperty>) attachable, (Partition) hostVertexOrGraph); return (V) optional.orElseThrow(() -> hostVertexOrGraph instanceof Graph ? Attachable.Exceptions.canNotGetAttachableFromHostGraph(attachable, (Graph) hostVertexOrGraph) : Attachable.Exceptions.canNotGetAttachableFromHostVertex(attachable, (Vertex) hostVertexOrGraph)); @@ -178,6 +185,11 @@ public interface Attachable<V> { return ElementHelper.areEqual(attachableVertex.get(), hostVertex) ? Optional.of(hostVertex) : Optional.empty(); } + public static Optional<Vertex> getVertex(final Attachable<Vertex> attachableVertex, final Partition hostPartition) { + final Iterator<Vertex> iterator = hostPartition.vertices(attachableVertex.get().id()); + return iterator.hasNext() ? Optional.of(iterator.next()) : Optional.empty(); + } + public static Optional<Edge> getEdge(final Attachable<Edge> attachableEdge, final Graph hostGraph) { final Iterator<Edge> edgeIterator = hostGraph.edges(attachableEdge.get().id()); return edgeIterator.hasNext() ? Optional.of(edgeIterator.next()) : Optional.empty(); @@ -194,6 +206,11 @@ public interface Attachable<V> { return Optional.empty(); } + public static Optional<Edge> getEdge(final Attachable<Edge> attachableEdge, final Partition hostPartition) { + final Iterator<Edge> iterator = hostPartition.edges(attachableEdge.get().id()); + return iterator.hasNext() ? Optional.of(iterator.next()) : Optional.empty(); + } + public static Optional<VertexProperty> getVertexProperty(final Attachable<VertexProperty> attachableVertexProperty, final Graph hostGraph) { final VertexProperty baseVertexProperty = attachableVertexProperty.get(); final Iterator<Vertex> vertexIterator = hostGraph.vertices(baseVertexProperty.element().id()); @@ -219,6 +236,20 @@ public interface Attachable<V> { return Optional.empty(); } + public static Optional<VertexProperty> getVertexProperty(final Attachable<VertexProperty> attachableVertexProperty, final Partition hostPartition) { + final VertexProperty baseVertexProperty = attachableVertexProperty.get(); + final Iterator<Vertex> vertexIterator= hostPartition.vertices(baseVertexProperty.element().id()); + if (vertexIterator.hasNext()) { + final Iterator<VertexProperty<Object>> vertexPropertyIterator = vertexIterator.next().properties(baseVertexProperty.key()); + while (vertexPropertyIterator.hasNext()) { + final VertexProperty vertexProperty = vertexPropertyIterator.next(); + if (ElementHelper.areEqual(vertexProperty, baseVertexProperty)) + return Optional.of(vertexProperty); + } + } + return Optional.empty(); + } + public static Optional<Property> getProperty(final Attachable<Property> attachableProperty, final Graph hostGraph) { final Property baseProperty = attachableProperty.get(); final Element propertyElement = attachableProperty.get().element(); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java index 4d9f565..397c113 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java @@ -19,19 +19,24 @@ package org.apache.tinkerpop.gremlin.structure.util.partitioner; +import org.apache.commons.configuration.MapConfiguration; import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Element; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Partition; import org.apache.tinkerpop.gremlin.structure.Partitioner; import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; +import java.io.Serializable; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; /** * @author Marko A. Rodriguez (http://markorodriguez.com) @@ -61,12 +66,14 @@ public final class GlobalPartitioner implements Partitioner { private class GlobalPartition implements Partition { - private final Graph graph; + private transient Graph graph; + private final Map<String, Object> configuration = new HashMap<>(); private final String id; private final InetAddress location; private GlobalPartition(final Graph graph) { this.graph = graph; + graph.configuration().getKeys().forEachRemaining(key -> configuration.put(key, graph.configuration().getProperty(key))); this.id = "global-" + graph.getClass().getSimpleName().toLowerCase(); try { this.location = InetAddress.getLocalHost(); @@ -82,11 +89,15 @@ public final class GlobalPartitioner implements Partitioner { @Override public Iterator<Vertex> vertices(final Object... ids) { + if(null == this.graph) + this.graph = GraphFactory.open(new MapConfiguration(this.configuration)); return this.graph.vertices(ids); } @Override public Iterator<Edge> edges(final Object... ids) { + if(null == this.graph) + this.graph = GraphFactory.open(new MapConfiguration(this.configuration)); return this.graph.edges(ids); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/config/SerializableConfiguration.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/config/SerializableConfiguration.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/config/SerializableConfiguration.java new file mode 100644 index 0000000..2a1eac1 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/config/SerializableConfiguration.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.util.config; + +import org.apache.commons.configuration.AbstractConfiguration; +import org.apache.commons.configuration.Configuration; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class SerializableConfiguration extends AbstractConfiguration implements Serializable { + + private final Map<String, Object> properties = new HashMap<>(); + + public SerializableConfiguration() { + super(); + super.setDelimiterParsingDisabled(true); + } + + public SerializableConfiguration(final Configuration configuration) { + this(); + this.copy(configuration); + } + + @Override + protected void addPropertyDirect(final String key, final Object value) { + this.properties.put(key, value); + } + + @Override + protected void clearPropertyDirect(final String key) { + this.properties.remove(key); + } + + @Override + public boolean isEmpty() { + return this.properties.isEmpty(); + } + + @Override + public boolean containsKey(final String key) { + return this.properties.containsKey(key); + } + + @Override + public Object getProperty(final String key) { + return this.properties.get(key); + } + + @Override + public Iterator<String> getKeys() { + return this.properties.keySet().iterator(); + } + +}