Repository: tinkerpop Updated Branches: refs/heads/TINKERPOP-1564 df5992755 -> d013d4052
AkkaGraphActors now confgiures the GryoSerializer registration using the ActorProgram getMessageTypes(). Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/d013d405 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/d013d405 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/d013d405 Branch: refs/heads/TINKERPOP-1564 Commit: d013d4052a01a33d89a25baf6fdd1e64955ace9a Parents: df59927 Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Wed Jan 25 13:35:36 2017 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Wed Jan 25 13:35:36 2017 -0700 ---------------------------------------------------------------------- .../akka/process/actors/AkkaConfigFactory.java | 2 +- .../gremlin/akka/process/actors/Constants.java | 1 + .../process/actors/io/gryo/GryoSerializer.java | 57 +++++++++++--------- 3 files changed, 34 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d013d405/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java index a49b545..1422e07 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java @@ -48,7 +48,7 @@ final class AkkaConfigFactory { static Config generateAkkaConfig(final ActorProgram<?> actorProgram, final Configuration configuration) { Config config = ConfigFactory.defaultApplication(). - withValue("akka.actor.serialization-bindings", ConfigValueFactory.fromMap(GryoSerializer.getSerializerBindings(configuration))). + withValue(Constants.AKKA_ACTOR_SERIALIZATION_BINDINGS, ConfigValueFactory.fromMap(GryoSerializer.getSerializerBindings(actorProgram, configuration))). withValue("custom-dispatcher.mailbox-requirement", ConfigValueFactory.fromAnyRef(ActorMailbox.class.getCanonicalName() + "$" + ActorMailbox.ActorSemantics.class.getSimpleName())). withValue("custom-dispatcher-mailbox.mailbox-type", ConfigValueFactory.fromAnyRef(ActorMailbox.class.getCanonicalName())). withValue("akka.actor.mailbox.requirements", ConfigValueFactory.fromMap(Collections.singletonMap(ActorMailbox.class.getCanonicalName() + "$" + ActorMailbox.ActorSemantics.class.getSimpleName(), "custom-dispatcher-mailbox"))). http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d013d405/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/Constants.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/Constants.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/Constants.java index ed4f383..c54f81a 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/Constants.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/Constants.java @@ -38,5 +38,6 @@ public final class Constants { public static final String AKKA_CLUSTER_SEED_NODES = "akka.cluster.seed-nodes"; public static final String AKKA_CLUSTER_AUTO_DOWN_UNREACHABLE_AFTER = "akka.cluster.auto-down-unreachable-after"; public static final String GREMLIN_AKKA_SYSTEM_NAME = "gremlin.akka.system-name"; + public static final String AKKA_ACTOR_SERIALIZATION_BINDINGS = "akka.actor.serialization-bindings"; } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d013d405/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/gryo/GryoSerializer.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/gryo/GryoSerializer.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/gryo/GryoSerializer.java index 733528c..8d567ba 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/gryo/GryoSerializer.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/gryo/GryoSerializer.java @@ -24,11 +24,7 @@ import akka.serialization.Serializer; import com.typesafe.config.Config; import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; -import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddMessage; -import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierDoneMessage; -import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectAddMessage; -import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectSetMessage; -import org.apache.tinkerpop.gremlin.process.actors.traversal.message.Terminate; +import org.apache.tinkerpop.gremlin.process.actors.ActorProgram; import org.apache.tinkerpop.gremlin.process.actors.util.DefaultActorsResult; import org.apache.tinkerpop.gremlin.structure.io.IoRegistry; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper; @@ -43,8 +39,12 @@ import scala.Option; import java.io.ByteArrayOutputStream; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; + +import static org.apache.tinkerpop.gremlin.akka.process.actors.Constants.AKKA_ACTOR_SERIALIZATION_BINDINGS; /** * @author Marko A. Rodriguez (http://markorodriguez.com) @@ -55,6 +55,22 @@ public final class GryoSerializer implements Serializer { public GryoSerializer(final ExtendedActorSystem actorSystem) { final Config config = actorSystem.settings().config(); + final Set<Class> gryoClasses = new HashSet<>(); + for (final Map.Entry<String, String> entry : ((Map<String, String>) config.getAnyRef(AKKA_ACTOR_SERIALIZATION_BINDINGS)).entrySet()) { + if (entry.getValue().equals("gryo")) { + gryoClasses.add(ClassUtil.getClassOrEnum(entry.getKey())); + } + } + // remove Gryo 3.0 classes + GryoVersion.V3_0.getRegistrations().forEach(type -> gryoClasses.remove(type.getTargetClass())); + // this sucks. how to do this automatically? + gryoClasses.remove(Short.class); + gryoClasses.remove(Integer.class); + gryoClasses.remove(Float.class); + gryoClasses.remove(Double.class); + gryoClasses.remove(Long.class); + gryoClasses.remove(String.class); + // final List<IoRegistry> registryList; if (config.hasPath(IoRegistry.IO_REGISTRY)) { final Configuration configuration = new BaseConfiguration(); @@ -66,33 +82,24 @@ public final class GryoSerializer implements Serializer { this.gryoPool = GryoPool.build(). poolSize(10). initializeMapper(builder -> - builder.referenceTracking(true). - registrationRequired(true). + builder.referenceTracking(true). // config.getBoolean("gremlin.gryo.referenceTracking")). + registrationRequired(true). // config.getBoolean("gremlin.gryo.registrationRequired")). version(GryoVersion.V3_0). - addRegistries(registryList). - addCustom( - Terminate.class, - BarrierAddMessage.class, - BarrierDoneMessage.class, - SideEffectSetMessage.class, - SideEffectAddMessage.class, - DefaultActorsResult.class)).create(); + addCustom(gryoClasses.toArray(new Class[gryoClasses.size()])). + addRegistries(registryList)).create(); } - public static Map<String, String> getSerializerBindings(final Configuration configuration) { + public static Map<String, String> getSerializerBindings(final ActorProgram<?> actorProgram, final Configuration configuration) { + final Set<Class> programMessageClasses = new HashSet<>(actorProgram.getMessageTypes().keySet()); + programMessageClasses.add(DefaultActorsResult.class); // todo: may make this a Gryo3.0 class in the near future + GryoVersion.V3_0.getRegistrations().forEach(type -> programMessageClasses.remove(type.getTargetClass())); final Map<String, String> bindings = new HashMap<>(); GryoMapper.build(). - referenceTracking(true). - registrationRequired(true). + referenceTracking(configuration.getBoolean("gremlin.gryo.referenceTracking", true)). + registrationRequired(configuration.getBoolean("gremlin.gryo.registrationRequired", true)). version(GryoVersion.V3_0). + addCustom(programMessageClasses.toArray(new Class[programMessageClasses.size()])). addRegistries(IoRegistryHelper.createRegistries(configuration)). - addCustom( - Terminate.class, - BarrierAddMessage.class, - BarrierDoneMessage.class, - SideEffectSetMessage.class, - SideEffectAddMessage.class, - DefaultActorsResult.class). create(). getRegisteredClasses(). stream().