P.S , New to akka and scala 

I have an usecase to create http host connection pool to a http rest server 
from set of actors.

.Each of these are child  actors and   part of a Router Logic following 
SmallMailBoxSizeRouterlogic.The router actors can also be multiple based on 
application flow and created dynamically based on a trigger



Currently a connection pool  flow is created in the child actor (I followed 
the sample in documentation).The code is as follows

class WorkerActor extends Actor with ActorLogging {

 val QueueSize = 100

  //A create http client pool to core server

  val poolClientFlow = 
Http().cachedHostConnectionPool[Promise[HttpResponse]](coreServerIP, 
coreServerPort)

  poolClientFlow.log(coreServerIP)

  val queue =

    Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, 
OverflowStrategy.dropNew)

      .via(poolClientFlow)

      .toMat(Sink.foreach({

        case ((Success(resp), p)) => p.success(resp)

        case ((Failure(e), p)) => p.failure(e)

      }))(Keep.left)

      .run()


  def queueRequest(request: HttpRequest): Future[HttpResponse] = {

    val responsePromise = Promise[HttpResponse]()

    queue.offer(request -> responsePromise).flatMap {

      case QueueOfferResult.Enqueued => responsePromise.future

      case QueueOfferResult.Dropped => Future.failed(new 
RuntimeException("Queue 
overflowed. Try again later."))

      case QueueOfferResult.Failure(ex) => Future.failed(ex)

      case QueueOfferResult.QueueClosed => Future.failed(new 
RuntimeException("Queue was closed (pool shut down) while running the 
request. Try again later."))

    }

  }


def receive: Receive = {

    case request: Request =>

      log.debug("Received message for processing "+request)

       val responseFuture: Future[HttpResponse] = queueRequest(httpReq)

      handleRequestAndReponse(startTime,responseFuture, sender, request, 
currentTime)


  }




}



//and the RouterActor 


class SmallestMailBoxRouterActor(actorProps: Props, workerCount: Int) 
extends Actor with ActorLogging {

  var router = {

    val routees = Vector.fill(workerCount) {

      val r = context.actorOf(actorProps)

      context watch r

      ActorRefRoutee(r)

    }

    Router(SmallestMailboxRoutingLogic(), routees)

  }


  def receive = {

    case w: Request ⇒

      router.route(w, sender())

    case Terminated(a) ⇒

      router = router.removeRoutee(a)

      val r = context.actorOf(actorProps)

      context watch r

      router = router.addRoutee(r)

  }


}


//Actor refrence is created 

val routerActor = 
SmallestMailBoxRouterActor.createActorRef(WorkerActor.props("new#id1"))

val routerActor1 = SmallestMailBoxRouterActor.createActorRef(WorkerActor
.props("new#id2"))


This works fine.But I am not sure , whether multiple connection pools are 
created as pool flow is created at the child actor.

Aim is to create a pool per "routerActor#" and re-use it in WorkerActors.


As per the akka streams documentation , Flow is thread safe and can 
exchanged between actors.


But coudn;t find a snippet to do that.


Can any one give an idea how to share Flow object between actors instances.


regards

Abhijith V R







-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to