I was thinking of the solution using the wrong flow shapes.  I want to 
check the cache first and then depending on its outcome to either continue 
with the happy path or to hit the website.  I was able to accomplish this 
by looking at the FlexiRoute code in the "Custom Stream Processing" 
chapter.  Here is my modified Unzip class:

class Unzip extends FlexiRoute[(String, Option[Content]), UnzipShape[String, 
Option[Content]]](
  new UnzipShape, Attributes.name("Unzip")) {

  import FlexiRoute._

  override def createRouteLogic(p: PortT) = new RouteLogic[(String, Option[
Content])] {
    override def initialState =
      State[Any](DemandFromAll(p.outA, p.outB)) {
        (ctx, _, element) =>
          val (a, b) = element
          b match {
            case Some(i) => ctx.emit(p.outB)(b)
            case None => ctx.emit(p.outA)(a)
          }
          SameState
      }

    override def initialCompletionHandling = eagerClose
  }
}

And then a sample App:

object CacheExample extends App {

  implicit val as = ActorSystem()
  implicit val ec = as.dispatcher
  implicit val mat = ActorMaterializer()

  val fetchUrl = Flow[String]
    .mapAsync(2)(url => Future.successful {
    println("-- fetching from URL")
    Some(Content(BigDecimal(Random.nextLong()).abs))
  })

  val fetchCache = Flow[String]
    .mapAsync(1)(url => Future.successful {
    if (Random.nextBoolean()) {
      println("-- fetching from cache")
      Some(Content(BigDecimal(Random.nextLong()).abs))
    } else {
      None
    }
  })

  val cacheFlow: Graph[FlowShape[String, Option[Content]], Unit] = FlowGraph
.partial() { implicit b =>
    import FlowGraph.Implicits._

    val bcase = b.add(Broadcast[String](2))
    val zip = b.add(Zip[String, Option[Content]]())
    val unzip = b.add(new Unzip())
    val merge = b.add(Merge[Option[Content]](2))

    unzip.outA ~> fetchUrl ~> merge
    unzip.outB ~> merge

    bcase ~> zip.in0
    bcase ~> fetchCache ~> zip.in1

    zip.out ~> unzip.in

    FlowShape(bcase.in, merge.out)
  }

  val f = Source.single(List("/bob", "/nancy", "/jane")).mapConcat(identity)
    .via(cacheFlow)
    .runFold(BigDecimal("0"))((totalSalaries, contentOption) => 
contentOption.fold(totalSalaries)(content => totalSalaries + content.salary
))

  f.onComplete {
    case Success(totalSalaries) =>
      println(s"Total salaries $totalSalaries")
    case Failure(e) =>
      e.printStackTrace()
  }

}

Seamus

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to