Added AkkaConfigFactory which deals with creating an Akka Config. Identical in nature to SparkConf and Hadoop's Configuration. Akka GryoSerializer pimp'd up a bit. I had a default pool size of 256. Test cases were slow. Changed to 1 -- BOOMIN fast. Wonder what the nature of serializer reuse is in Akka.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/c4da4d32 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/c4da4d32 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/c4da4d32 Branch: refs/heads/TINKERPOP-1564 Commit: c4da4d321954d7894a4d2959c95aba0b65872549 Parents: f2a8c67 Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Thu Jan 12 07:24:12 2017 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Thu Jan 19 10:28:05 2017 -0700 ---------------------------------------------------------------------- .../akka/process/actors/AkkaConfigFactory.java | 62 ++++++++++++++++++++ .../akka/process/actors/AkkaGraphActors.java | 26 +------- .../akka/process/actors/io/GryoSerializer.java | 17 +++--- .../src/main/resources/application.conf | 48 ++++++--------- 4 files changed, 88 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c4da4d32/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java new file mode 100644 index 0000000..a85e25a --- /dev/null +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.actors; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import org.apache.tinkerpop.gremlin.akka.process.actors.io.GryoSerializer; +import org.apache.tinkerpop.gremlin.process.actors.ActorProgram; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +final class AkkaConfigFactory { + + private AkkaConfigFactory() { + // static method class + } + + static Config generateAkkaConfig(final ActorProgram actorProgram) { + final Map<String, String> registeredGryoClasses = new HashMap<>(); + new GryoSerializer().getGryoMapper().getRegisteredClasses().stream().filter(clazz -> !clazz.isArray()).forEach(clazz -> { + int index = clazz.getCanonicalName().lastIndexOf("."); + registeredGryoClasses.put(null == clazz.getEnclosingClass() ? + clazz.getCanonicalName() : + clazz.getCanonicalName().substring(0, index) + "$" + clazz.getCanonicalName().substring(index + 1), "gryo"); + }); + return ConfigFactory.defaultApplication(). + withValue("akka.actor.serialization-bindings", ConfigValueFactory.fromMap(registeredGryoClasses)). + withValue("custom-dispatcher.mailbox-requirement", ConfigValueFactory.fromAnyRef(ActorMailbox.class.getCanonicalName() + "$" + ActorMailbox.ActorSemantics.class.getSimpleName())). + withValue("custom-dispatcher-mailbox.mailbox-type", ConfigValueFactory.fromAnyRef(ActorMailbox.class.getCanonicalName())). + withValue("akka.actor.mailbox.requirements", ConfigValueFactory.fromMap(Collections.singletonMap(ActorMailbox.class.getCanonicalName() + "$" + ActorMailbox.ActorSemantics.class.getSimpleName(), "custom-dispatcher-mailbox"))). + withValue("message-priorities", + ConfigValueFactory.fromAnyRef(actorProgram.getMessagePriorities(). + orElse(Collections.singletonList(Object.class)). + stream(). + map(Class::getCanonicalName). + collect(Collectors.toList()).toString())); + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c4da4d32/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java index 7641b01..2638bfa 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java @@ -21,12 +21,8 @@ package org.apache.tinkerpop.gremlin.akka.process.actors; import akka.actor.ActorSystem; import akka.actor.Props; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigValueFactory; import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; -import org.apache.tinkerpop.gremlin.akka.process.actors.io.GryoSerializer; import org.apache.tinkerpop.gremlin.process.actors.ActorProgram; import org.apache.tinkerpop.gremlin.process.actors.ActorsResult; import org.apache.tinkerpop.gremlin.process.actors.Address; @@ -41,12 +37,8 @@ import org.apache.tinkerpop.gremlin.util.config.SerializableConfiguration; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; -import java.util.stream.Collectors; /** * @author Marko A. Rodriguez (http://markorodriguez.com) @@ -93,24 +85,8 @@ public final class AkkaGraphActors<R> implements GraphActors<R> { if (this.executed) throw new IllegalStateException("Can not execute twice"); this.executed = true; - final Map<String, String> registeredGryoClasses = new HashMap<>(); - new GryoSerializer().getGryoMapper().getRegisteredClasses().stream().filter(clazz -> !clazz.isArray()).forEach(clazz -> { - int index = clazz.getCanonicalName().lastIndexOf("."); - registeredGryoClasses.put(null == clazz.getEnclosingClass() ? - clazz.getCanonicalName() : - clazz.getCanonicalName().substring(0, index) + "$" + clazz.getCanonicalName().substring(index + 1), "gryo"); - }); - final Config config = ConfigFactory.defaultApplication(). - withValue("message-priorities", - ConfigValueFactory.fromAnyRef(this.actorProgram.getMessagePriorities(). - orElse(Collections.singletonList(Object.class)). - stream(). - map(Class::getCanonicalName). - collect(Collectors.toList()).toString())). - withValue("akka.actor.serialization-bindings", ConfigValueFactory.fromMap(registeredGryoClasses)); - - final ActorSystem system = ActorSystem.create("traversal", config); + final ActorSystem system = ActorSystem.create("traversal", AkkaConfigFactory.generateAkkaConfig(this.actorProgram)); final ActorsResult<R> result = new DefaultActorsResult<>(); final Partitioner partitioner = this.workers == 1 ? graph.partitioner() : new HashPartitioner(graph.partitioner(), this.workers); try { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c4da4d32/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java index 188ba9f..c567497 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java @@ -45,7 +45,7 @@ public final class GryoSerializer implements Serializer { public GryoSerializer() { this.gryoPool = GryoPool.build(). - poolSize(100). + poolSize(1). initializeMapper(builder -> builder.referenceTracking(true). registrationRequired(true). @@ -65,16 +65,15 @@ public final class GryoSerializer implements Serializer { @Override public int identifier() { - return 0; + return GryoVersion.V3_0.hashCode(); } @Override public byte[] toBinary(final Object object) { - final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - final Output output = new Output(outputStream); - this.gryoPool.writeWithKryo(kryo -> kryo.writeClassAndObject(output, object)); + final Output output = new Output(new ByteArrayOutputStream()); + this.gryoPool.writeWithKryo(kryo -> kryo.writeObject(output, object)); output.flush(); - return outputStream.toByteArray(); + return output.getBuffer(); } @Override @@ -96,8 +95,8 @@ public final class GryoSerializer implements Serializer { @Override public Object fromBinary(byte[] bytes, Class<?> aClass) { - final ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); - final Input input = new Input(inputStream); - return this.gryoPool.readWithKryo(kryo -> kryo.readClassAndObject(input)); // todo: be smart about just reading object + final Input input = new Input(); + input.setBuffer(bytes); + return this.gryoPool.readWithKryo(kryo -> kryo.readObject(input, aClass)); // todo: be smart about just reading object } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c4da4d32/akka-gremlin/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/resources/application.conf b/akka-gremlin/src/main/resources/application.conf index d1dbf4a..cd8b190 100644 --- a/akka-gremlin/src/main/resources/application.conf +++ b/akka-gremlin/src/main/resources/application.conf @@ -1,38 +1,24 @@ akka { log-dead-letters-during-shutdown = "false" -} - -custom-dispatcher-mailbox { - mailbox-type = "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox" -} - -custom-dispatcher { - mailbox-requirement = "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox$ActorSemantics" -} - -akka.actor { - provider = remote - serialize-messages = on - serializers { - gryo = "org.apache.tinkerpop.gremlin.akka.process.actors.io.GryoSerializer" + actor { + provider = remote + serialize-messages = on + serializers { + gryo = "org.apache.tinkerpop.gremlin.akka.process.actors.io.GryoSerializer" + } } - mailbox.requirements { - "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox$ActorSemantics" = custom-dispatcher-mailbox + remote { + enabled-transports = ["akka.remote.netty.tcp"] + netty.tcp { + hostname = "127.0.0.1" + port = 2552 + } } -} + cluster { + seed-nodes = [ + "akka.tcp://traversal@127.0.0.1:2551", + "akka.tcp://traversal@127.0.0.1:2552"] -akka.remote { - enabled-transports = ["akka.remote.netty.tcp"] - netty.tcp { - hostname = "127.0.0.1" - port = 2552 + auto-down-unreachable-after = 10s } -} - -akka.cluster { - seed-nodes = [ - "akka.tcp://traversal@127.0.0.1:2551", - "akka.tcp://traversal@127.0.0.1:2552"] - - auto-down-unreachable-after = 10s } \ No newline at end of file