I am trying to implement a customized Sink, in which I need to deal with 
Future, but I don't know how to do. Could you please help me?

class EventSink(...) {

  val in: Inlet[EventEnvelope2] = Inlet("EventSink")
  override val shape: SinkShape[EventEnvelope2] = SinkShape(in)

  override def createLogic(inheritedAttributes: Attributes): 
GraphStageLogic = {
    new GraphStageLogic(shape) {

      // This requests one element at the Sink startup.
      override def preStart(): Unit = pull(in)

      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          val future = handle(grab(in))
          Await.ready(future, Duration.Inf)
          future.onComplete {
            case Success(_) =>
              logger.info("pulling next events")
            case Failure(failure) =>
              logger.error(failure.getMessage, failure)
              throw failure

  private def handle(envelope: EventEnvelope2): Future[Unit] = {
    val EventEnvelope2(query.Sequence(offset), _/*persistenceId*/, 
_/*sequenceNr*/, event) = envelope

I currently have to block the future, as with the one that I commented out, 
it only received the first event and stopped. Is there a non-blocking way?


>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to