I'd like to create a Flow[HttpRequest, HttpResponse] by joining a BidiFlow
built from a custom GraphStage with a connection pool flow built with Akka
Http's request-level client-side API. When running the stream I am getting
the below exception. If I replace the BidiFlow built from a custom
GraphStage with a BidiFlow built via the BidiFlow.fromFunctions api (with
the same functionality), it succeeds. As far as I can tell the BidiFlows
are functionally equivalent. Included below is a fully runnable Scala file
demonstrating the success/failure of the two BidiFlows, both of which
simply forward requests as-is.
Exception in thread "main" java.util.NoSuchElementException: head of empty
stream
at
akka.stream.scaladsl.Sink$$anonfun$head$1$$anonfun$apply$2$$anonfun$apply$3.apply(Sink.scala:128)
at
akka.stream.scaladsl.Sink$$anonfun$head$1$$anonfun$apply$2$$anonfun$apply$3.apply(Sink.scala:128)
at scala.Option.getOrElse(Option.scala:121)
at
akka.stream.scaladsl.Sink$$anonfun$head$1$$anonfun$apply$2.apply(Sink.scala:128)
at
akka.stream.scaladsl.Sink$$anonfun$head$1$$anonfun$apply$2.apply(Sink.scala:128)
at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
at scala.util.Try$.apply(Try.scala:192)
at scala.util.Success.map(Try.scala:237)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73)
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:76)
at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120)
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:75)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.trySuccess(Promise.scala:94)
at
scala.concurrent.impl.Promise$DefaultPromise.trySuccess(Promise.scala:153)
at
akka.stream.impl.HeadOptionStage$$anon$3.onUpstreamFinish(Sinks.scala:255)
at
akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:732)
at
akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:616)
at
akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471)
at
akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:381)
at
akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:538)
at
akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:586)
at akka.actor.Actor$class.aroundPreStart(Actor.scala:489)
at
akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:529)
at akka.actor.ActorCell.create(ActorCell.scala:590)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
package com.workday.excalibur.http
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream._
import akka.stream.scaladsl.{BidiFlow, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.util.ByteString
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
object TestBidiToConnectionPool extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// connection pool flow
val connectionPoolFlow = Http().superPool[Int]()
// Bidi from functions join connection pool
val flowFromFunctionsBidi = BidiFlow.fromFunctions((request: (HttpRequest,
Int)) ⇒ request, (response: (Try[HttpResponse], Int)) ⇒
response).join(connectionPoolFlow)
// Bidi from custom GraphStage join connection pool
val flowFromCustomGraphStageBidi = BidiFlow.fromGraph(new
CustomIdentityBidi).join(connectionPoolFlow)
// SUCCESS -> Submit HttpRequest to flow built with functions Bidi
val responseFuture = Source.single(HttpRequest(uri = "http://httpbin.org/ip")
-> 42).via(flowFromFunctionsBidi).runWith(Sink.head)
Await.result(responseFuture, 10 seconds) match {
case (tryResponse, context) ⇒ {
tryResponse match {
case Success(httpResponse) ⇒ {
val entity =
Await.result(httpResponse.entity.dataBytes.runFold(ByteString(""))(_ ++ _), 3
second).utf8String
println("Success: " + entity)
}
case Failure(t) ⇒ {
println("Error occurred: " + t)
}
}
}
}
// FAILURE -> Submit HttpRequest to flow built with custom GraphStage
// Exception in thread "main" java.util.NoSuchElementException: head of empty
stream
val responseFuture2 = Source.single(HttpRequest(uri =
"http://httpbin.org/ip") ->
42).via(flowFromCustomGraphStageBidi).runWith(Sink.head)
Await.result(responseFuture2, 10 seconds) match {
case (tryResponse, context) ⇒ {
tryResponse match {
case Success(httpResponse) ⇒ {
val entity =
Await.result(httpResponse.entity.dataBytes.runFold(ByteString(""))(_ ++ _), 3
second).utf8String
println("Success: " + entity)
}
case Failure(t) ⇒ {
println("Error occurred: " + t)
}
}
}
}
}
class CustomIdentityBidi extends GraphStage[BidiShape[(HttpRequest, Int),
(HttpRequest, Int), (Try[HttpResponse], Int), (Try[HttpResponse], Int)]] {
val requestIn = Inlet[(HttpRequest, Int)]("requestIn")
val requestOut = Outlet[(HttpRequest, Int)]("requestOut")
val responseIn = Inlet[(Try[HttpResponse], Int)]("responseIn")
val responseOut = Outlet[(Try[HttpResponse], Int)]("responseOut")
override val shape = BidiShape.of(requestIn, requestOut, responseIn,
responseOut)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(requestIn, new InHandler {
override def onPush(): Unit = {
push(requestOut, grab(requestIn))
}
})
setHandler(requestOut, new OutHandler {
override def onPull(): Unit = {
pull(requestIn)
}
})
setHandler(responseIn, new InHandler {
override def onPush(): Unit = {
push(responseOut, grab(responseIn))
}
})
setHandler(responseOut, new OutHandler {
override def onPull(): Unit = {
pull(responseIn)
}
})
}
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.