I am trying to implement a customized Sink, in which I need to deal with 
Future, but I don't know how to do. Could you please help me?

class EventSink(...) {

  val in: Inlet[EventEnvelope2] = Inlet("EventSink")
  override val shape: SinkShape[EventEnvelope2] = SinkShape(in)

  override def createLogic(inheritedAttributes: Attributes): 
GraphStageLogic = {
    new GraphStageLogic(shape) {

      // This requests one element at the Sink startup.
      override def preStart(): Unit = pull(in)

      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          val future = handle(grab(in))
          Await.ready(future, Duration.Inf)
          future.onComplete {
            case Success(_) =>
              logger.info("pulling next events")
            case Failure(failure) =>
              logger.error(failure.getMessage, failure)
              throw failure

  private def handle(envelope: EventEnvelope2): Future[Unit] = {
    val EventEnvelope2(query.Sequence(offset), _/*persistenceId*/, 
_/*sequenceNr*/, event) = envelope

I currently have to block the future, as with the one that I commented out, 
it only received the first event and stopped. Is there a non-blocking way?


