I'd like to create a Flow[HttpRequest, HttpResponse] by joining a BidiFlow 
built from a custom GraphStage with a connection pool flow built with Akka 
Http's request-level client-side API.  When running the stream I am getting 
the below exception.  If I replace the BidiFlow built from a custom 
GraphStage with a BidiFlow built via the BidiFlow.fromFunctions api (with 
the same functionality), it succeeds.  As far as I can tell the BidiFlows 
are functionally equivalent.  Included below is a fully runnable Scala file 
demonstrating the success/failure of the two BidiFlows, both of which 
simply forward requests as-is. 

Exception in thread "main" java.util.NoSuchElementException: head of empty 
stream
at 
akka.stream.scaladsl.Sink$$anonfun$head$1$$anonfun$apply$2$$anonfun$apply$3.apply(Sink.scala:128)
at 
akka.stream.scaladsl.Sink$$anonfun$head$1$$anonfun$apply$2$$anonfun$apply$3.apply(Sink.scala:128)
at scala.Option.getOrElse(Option.scala:121)
at 
akka.stream.scaladsl.Sink$$anonfun$head$1$$anonfun$apply$2.apply(Sink.scala:128)
at 
akka.stream.scaladsl.Sink$$anonfun$head$1$$anonfun$apply$2.apply(Sink.scala:128)
at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
at scala.util.Try$.apply(Try.scala:192)
at scala.util.Success.map(Try.scala:237)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73)
at 
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:76)
at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120)
at 
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:75)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.trySuccess(Promise.scala:94)
at 
scala.concurrent.impl.Promise$DefaultPromise.trySuccess(Promise.scala:153)
at 
akka.stream.impl.HeadOptionStage$$anon$3.onUpstreamFinish(Sinks.scala:255)
at 
akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:732)
at 
akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:616)
at 
akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471)
at 
akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:381)
at 
akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:538)
at 
akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:586)
at akka.actor.Actor$class.aroundPreStart(Actor.scala:489)
at 
akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:529)
at akka.actor.ActorCell.create(ActorCell.scala:590)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


package com.workday.excalibur.http

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream._
import akka.stream.scaladsl.{BidiFlow, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.util.ByteString

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}


object TestBidiToConnectionPool extends App {
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()

  // connection pool flow
  val connectionPoolFlow = Http().superPool[Int]()

  // Bidi from functions join connection pool
  val flowFromFunctionsBidi = BidiFlow.fromFunctions((request: (HttpRequest, 
Int)) ⇒ request, (response: (Try[HttpResponse], Int)) ⇒ 
response).join(connectionPoolFlow)

  // Bidi from custom GraphStage join connection pool
  val flowFromCustomGraphStageBidi = BidiFlow.fromGraph(new 
CustomIdentityBidi).join(connectionPoolFlow)

  // SUCCESS -> Submit HttpRequest to flow built with functions Bidi
  val responseFuture = Source.single(HttpRequest(uri = "http://httpbin.org/ip";) 
-> 42).via(flowFromFunctionsBidi).runWith(Sink.head)
  Await.result(responseFuture, 10 seconds) match {
    case (tryResponse, context) ⇒ {
      tryResponse match {
        case Success(httpResponse) ⇒ {
          val entity = 
Await.result(httpResponse.entity.dataBytes.runFold(ByteString(""))(_ ++ _), 3 
second).utf8String
          println("Success: " + entity)
        }
        case Failure(t) ⇒ {
          println("Error occurred: " + t)
        }
      }
    }
  }

  // FAILURE -> Submit HttpRequest to flow built with custom GraphStage
  // Exception in thread "main" java.util.NoSuchElementException: head of empty 
stream
  val responseFuture2 = Source.single(HttpRequest(uri = 
"http://httpbin.org/ip";) -> 
42).via(flowFromCustomGraphStageBidi).runWith(Sink.head)
  Await.result(responseFuture2, 10 seconds) match {
    case (tryResponse, context) ⇒ {
      tryResponse match {
        case Success(httpResponse) ⇒ {
          val entity = 
Await.result(httpResponse.entity.dataBytes.runFold(ByteString(""))(_ ++ _), 3 
second).utf8String
          println("Success: " + entity)
        }
        case Failure(t) ⇒ {
          println("Error occurred: " + t)
        }
      }
    }
  }
}

class CustomIdentityBidi extends GraphStage[BidiShape[(HttpRequest, Int), 
(HttpRequest, Int), (Try[HttpResponse], Int), (Try[HttpResponse], Int)]] {
  val requestIn = Inlet[(HttpRequest, Int)]("requestIn")
  val requestOut = Outlet[(HttpRequest, Int)]("requestOut")
  val responseIn = Inlet[(Try[HttpResponse], Int)]("responseIn")
  val responseOut = Outlet[(Try[HttpResponse], Int)]("responseOut")

  override val shape = BidiShape.of(requestIn, requestOut, responseIn, 
responseOut)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
new GraphStageLogic(shape) {
    setHandler(requestIn, new InHandler {
      override def onPush(): Unit = {
        push(requestOut, grab(requestIn))
      }
    })

    setHandler(requestOut, new OutHandler {
      override def onPull(): Unit = {
        pull(requestIn)
      }
    })

    setHandler(responseIn, new InHandler {
      override def onPush(): Unit = {
        push(responseOut, grab(responseIn))
      }
    })

    setHandler(responseOut, new OutHandler {
      override def onPull(): Unit = {
        pull(responseIn)
      }
    })
  }
}


-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to