Using the following stream setup, if my final, pre-sink step uses mapAsync and
calls a method that returns a future (of course), my stream never executes and
I see dead-letter messages in the log. If I change the doPublish method to use
map(), everything works fine. But ideally, I'd like to return a Success or
Failure based on the future returned in doPublish without adding explicit
blocking. Is this a known issue?
Thanks!
lazy val source = Source.actorPublisher[Job](MySource.props())
lazy val logSink = Sink.foreach((result: Result[FlowError, FlowDataComplete]) =>
if (result.isLeft) {
result.leftMap(fe => {
logger.warn(s"FlowError ${fe.message}")
sendAck(fe.ackData)
})
} else {
logger.debug("logSink received result")
result.map(h => {
logger.debug(s"logSink sending ack ${h.ackData}")
sendAck(h.ackData)
})
}
)
val flow = source via step1 via step2 via step3 via step4 via
publisher.doPublish to logSink
lazy val sourceRef = flow.run()
lazy val priceEventStreamReader = system.actorOf(
MyKinesisStreamReader.props(pricingConfig.kinesisConfig, metricsRegistry,
sourceRef))
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.