Lib dependency: "com.typesafe.akka" %% "akka-http" % "10.0.7"
Issue Background - I am running a Websocket performance test for a
WebSocket Proxy - custom implementation using Akka-Http; connected as a
client i.e., test for In-bound messages. Websocket end point happens to be
Riemann stream processor. Riemann gets events from a future in my client
(scala test) via Riemann TCP and the events are read back over Websocket.
Mention of Riemann is for context only.
Regarding API usage...
Socket Request -
http.singleWebSocketRequest(WebSocketRequest(s
"ws://${proxyDependency.wsHost}:${proxyDependency.wsPort}/${uriPath}"),
clientFlow)
Flow Definition:
Flow.fromSinkAndSource(defineSinkForIncoming(fn2HandleInBoundData), Source.
maybe) .viaMat(KillSwitches.single)(Keep.right) .takeWhile(nothing =>
isSinkInUse.get) .watchTermination()((uniqueKillSwitch: UniqueKillSwitch,
futureDone: Future[Done]) => { futureDone.map { result => {
logThatMessage("Flow
is terminated (probably disconnected by the Server or via kill switch.)",
None, LogLevelIndicators.INFO, fallBackLogger, localizedLoggerFn) if (!
promise2Listen.isCompleted) { promise2Listen success
gBoxWebSocketProxyFlowInitStatus.FLOW_TERMINATION_DETECTED } else {
logThatMessage("Promise to listen is already completed. Nothing to return
here.", None, LogLevelIndicators.INFO, fallBackLogger, localizedLoggerFn) }
} } recover { case ex: Throwable => { logThatMessage("Listen channel is
already open.", Option(ex), LogLevelIndicators.ERROR, fallBackLogger,
localizedLoggerFn) promise2Listen success
gBoxWebSocketProxyFlowInitStatus.UNTRACKED_EXCEPTION_DETECTED
} }
For context call for defineSinkForIncoming... returns Sink[Message,
Future[Done]]!
I've seen posts on this topic in closed state, so was wondering if any one
else faced similar issue?
Here's the stack trace:
[default-akka.actor.default-dispatcher-16]
[akka://default/user/StreamSupervisor-5/flow-0-0-unknown-operation]
Error in stage [Split]:
OnError(akka.stream.impl.SubscriptionTimeoutException: Substream Source has
not been materialized in 5000 milliseconds) (of class
akka.stream.actor.ActorSubscriberMessage$OnError) scala.MatchError:
OnError(akka.stream.impl.SubscriptionTimeoutException: Substream Source has
not been materialized in 5000 milliseconds) (of class
akka.stream.actor.ActorSubscriberMessage$OnError) at
akka.stream.impl.fusing.SubSource.failSubstream(StreamOfStreams.scala:706)
at
akka.stream.stage.GraphStageLogic$SubSourceOutlet.fail(GraphStage.scala:1138)
at
akka.stream.impl.fusing.Split$$anon$2$SubstreamHandler.onUpstreamFailure(StreamOfStreams.scala:552)
at
akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:733)
at
akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:616)
at
akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471)
at
akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:423)
at
akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:603)
at
akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:618)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at
akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:529)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at
akka.actor.ActorCell.invoke(ActorCell.scala:495) at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at
akka.dispatch.Mailbox.run(Mailbox.scala:224) at
akka.dispatch.Mailbox.exec(Mailbox.scala:234) at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.