Use akka blocking io dispatcher in routers
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9f2863fd Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9f2863fd Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9f2863fd Branch: refs/heads/master Commit: 9f2863fd6e28977551a6cd98ac44b869ac337608 Parents: 63561ee Author: Dave Johnson <[email protected]> Authored: Mon Oct 10 17:24:26 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Mon Oct 10 17:24:26 2016 -0400 ---------------------------------------------------------------------- .../uniquevalues/UniqueValuesRouter.java | 17 ++++------------- .../qakka/distributed/actors/QueueActorRouter.java | 3 ++- .../distributed/actors/QueueSenderRouter.java | 7 ++++--- .../distributed/actors/QueueWriterRouter.java | 7 ++++--- 4 files changed, 14 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f2863fd/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java index 47db3a5..355320b 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java @@ -34,25 +34,16 @@ import org.slf4j.LoggerFactory; public class UniqueValuesRouter extends UntypedActor { private static final Logger logger = LoggerFactory.getLogger( UniqueValueActor.class ); - private final String name = RandomStringUtils.randomAlphanumeric( 4 ); - private final ActorRef router; + @Inject - public UniqueValuesRouter(Injector injector ) { + public UniqueValuesRouter() { router = getContext().actorOf( - FromConfig.getInstance() - .props(Props.create(UniqueValueActor.class) + FromConfig.getInstance().props( + Props.create( UniqueValueActor.class) .withDispatcher("akka.blocking-io-dispatcher")), "router"); - - // TODO: is there some way to pass the injector here without getting this exception: - // NotSerializableException: No configured serialization-bindings for class [InjectorImpl] - //router = getContext().actorOf( - //FromConfig.getInstance().props( Props.create( GuiceActorProducer.class, injector, UniqueValueActor.class)), - //"router" ); - - //logger.info("UniqueValuesRouter {} is live with injector {}", name, injector); } @Override http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f2863fd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java index f908e7f..b5b9c30 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java @@ -46,7 +46,8 @@ public class QueueActorRouter extends UntypedActor { this.queueActorRouterProducer = queueActorRouterProducer; this.routerRef = getContext().actorOf( FromConfig.getInstance().props( - Props.create(GuiceActorProducer.class, QueueActor.class)), "router"); + Props.create( GuiceActorProducer.class, QueueActor.class) + .withDispatcher("akka.blocking-io-dispatcher")), "router"); } @Override http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f2863fd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java index a205d71..88c5a4b 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java @@ -38,16 +38,17 @@ public class QueueSenderRouter extends UntypedActor { @Inject - public QueueSenderRouter( Injector injector ) { + public QueueSenderRouter() { this.router = getContext().actorOf( FromConfig.getInstance().props( - Props.create( GuiceActorProducer.class, QueueSender.class )), "router"); + Props.create( GuiceActorProducer.class, QueueSender.class ) + .withDispatcher("akka.blocking-io-dispatcher")), "router"); } @Override public void onReceive(Object message) { - if ( message instanceof QueueSendRequest) { + if ( message instanceof QueueSendRequest ) { router.tell( message, getSender() ); } else { http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f2863fd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java index cb06c1d..c3436eb 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java @@ -36,18 +36,19 @@ public class QueueWriterRouter extends UntypedActor { private final ActorRef router; + @Inject public QueueWriterRouter() { this.router = getContext().actorOf( FromConfig.getInstance().props( - Props.create( GuiceActorProducer.class, QueueWriter.class )), "router"); + Props.create( GuiceActorProducer.class, QueueWriter.class ) + .withDispatcher("akka.blocking-io-dispatcher")), "router"); } @Override public void onReceive(Object message) { - if ( message instanceof QueueWriteRequest || message instanceof QueueAckRequest ) { - + if ( message instanceof QueueWriteRequest || message instanceof QueueAckRequest ) { router.tell( message, getSender() ); } else {
