I was thinking of the solution using the wrong flow shapes. I want to
check the cache first and then depending on its outcome to either continue
with the happy path or to hit the website. I was able to accomplish this
by looking at the FlexiRoute code in the "Custom Stream Processing"
chapter. Here is my modified Unzip class:
class Unzip extends FlexiRoute[(String, Option[Content]), UnzipShape[String,
Option[Content]]](
new UnzipShape, Attributes.name("Unzip")) {
import FlexiRoute._
override def createRouteLogic(p: PortT) = new RouteLogic[(String, Option[
Content])] {
override def initialState =
State[Any](DemandFromAll(p.outA, p.outB)) {
(ctx, _, element) =>
val (a, b) = element
b match {
case Some(i) => ctx.emit(p.outB)(b)
case None => ctx.emit(p.outA)(a)
}
SameState
}
override def initialCompletionHandling = eagerClose
}
}
And then a sample App:
object CacheExample extends App {
implicit val as = ActorSystem()
implicit val ec = as.dispatcher
implicit val mat = ActorMaterializer()
val fetchUrl = Flow[String]
.mapAsync(2)(url => Future.successful {
println("-- fetching from URL")
Some(Content(BigDecimal(Random.nextLong()).abs))
})
val fetchCache = Flow[String]
.mapAsync(1)(url => Future.successful {
if (Random.nextBoolean()) {
println("-- fetching from cache")
Some(Content(BigDecimal(Random.nextLong()).abs))
} else {
None
}
})
val cacheFlow: Graph[FlowShape[String, Option[Content]], Unit] = FlowGraph
.partial() { implicit b =>
import FlowGraph.Implicits._
val bcase = b.add(Broadcast[String](2))
val zip = b.add(Zip[String, Option[Content]]())
val unzip = b.add(new Unzip())
val merge = b.add(Merge[Option[Content]](2))
unzip.outA ~> fetchUrl ~> merge
unzip.outB ~> merge
bcase ~> zip.in0
bcase ~> fetchCache ~> zip.in1
zip.out ~> unzip.in
FlowShape(bcase.in, merge.out)
}
val f = Source.single(List("/bob", "/nancy", "/jane")).mapConcat(identity)
.via(cacheFlow)
.runFold(BigDecimal("0"))((totalSalaries, contentOption) =>
contentOption.fold(totalSalaries)(content => totalSalaries + content.salary
))
f.onComplete {
case Success(totalSalaries) =>
println(s"Total salaries $totalSalaries")
case Failure(e) =>
e.printStackTrace()
}
}
Seamus
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.