P.S , New to akka and scala
I have an usecase to create http host connection pool to a http rest server
from set of actors.
.Each of these are child actors and part of a Router Logic following
SmallMailBoxSizeRouterlogic.The router actors can also be multiple based on
application flow and created dynamically based on a trigger
Currently a connection pool flow is created in the child actor (I followed
the sample in documentation).The code is as follows
class WorkerActor extends Actor with ActorLogging {
val QueueSize = 100
//A create http client pool to core server
val poolClientFlow =
Http().cachedHostConnectionPool[Promise[HttpResponse]](coreServerIP,
coreServerPort)
poolClientFlow.log(coreServerIP)
val queue =
Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize,
OverflowStrategy.dropNew)
.via(poolClientFlow)
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))(Keep.left)
.run()
def queueRequest(request: HttpRequest): Future[HttpResponse] = {
val responsePromise = Promise[HttpResponse]()
queue.offer(request -> responsePromise).flatMap {
case QueueOfferResult.Enqueued => responsePromise.future
case QueueOfferResult.Dropped => Future.failed(new
RuntimeException("Queue
overflowed. Try again later."))
case QueueOfferResult.Failure(ex) => Future.failed(ex)
case QueueOfferResult.QueueClosed => Future.failed(new
RuntimeException("Queue was closed (pool shut down) while running the
request. Try again later."))
}
}
def receive: Receive = {
case request: Request =>
log.debug("Received message for processing "+request)
val responseFuture: Future[HttpResponse] = queueRequest(httpReq)
handleRequestAndReponse(startTime,responseFuture, sender, request,
currentTime)
}
}
//and the RouterActor
class SmallestMailBoxRouterActor(actorProps: Props, workerCount: Int)
extends Actor with ActorLogging {
var router = {
val routees = Vector.fill(workerCount) {
val r = context.actorOf(actorProps)
context watch r
ActorRefRoutee(r)
}
Router(SmallestMailboxRoutingLogic(), routees)
}
def receive = {
case w: Request ⇒
router.route(w, sender())
case Terminated(a) ⇒
router = router.removeRoutee(a)
val r = context.actorOf(actorProps)
context watch r
router = router.addRoutee(r)
}
}
//Actor refrence is created
val routerActor =
SmallestMailBoxRouterActor.createActorRef(WorkerActor.props("new#id1"))
val routerActor1 = SmallestMailBoxRouterActor.createActorRef(WorkerActor
.props("new#id2"))
This works fine.But I am not sure , whether multiple connection pools are
created as pool flow is created at the child actor.
Aim is to create a pool per "routerActor#" and re-use it in WorkerActors.
As per the akka streams documentation , Flow is thread safe and can
exchanged between actors.
But coudn;t find a snippet to do that.
Can any one give an idea how to share Flow object between actors instances.
regards
Abhijith V R
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.