Hi
I've define a flow like this:
def makeAST = Flow[String].log("Parse Json") map { json =>
Try {
val jsonMessage = parse(json)
val jsonNode = asJsonNode(jsonMessage)
val report = validator.validate(jsonSchema, jsonNode)
if (!report.isSuccess) throw new Exception("Validation of Json fail")
jsonMessage
}
}
So I need to check if json is valid then if yes I continue the flow else I
need to reply a 500 error
to make this with akka-flow i'm using this
val A = builder.add(makeAST.named("makeAST"))
val B = builder.add(filterPartialFlow(j => j.isSuccess))
where filterPartialFlow is
def filterPartialFlow(filterFunction: T => Boolean) = FlowGraph.create() {
implicit b =>
val bcast = b.add(Broadcast[T](2))
val filter = b.add(Flow[T] filter (filterFunction(_)))
val notFilter = b.add(Flow[T] filter (!filterFunction(_)))
bcast ~> filter
bcast ~> notFilter
UniformFanOutShape(bcast.in, filter.outlet, notFilter.outlet)
}
When I throw the Exception I arrive to another flow that make something
like this
def sendHTTPErrorBack = Flow[Try[JValue]]
.mapAsync[HttpResponse](10) { v =>
Future(HttpResponse(status = StatusCodes.BadRequest, entity = HttpEntity
("Invalid Json Error")))
}.withAttributes(supervisionStrategy(resumingDecider))
In the route I define something like this:
flows.kafkaFlow(json)
.withAttributes(ActorAttributes.supervisionStrategy(
decider))
.runWith(Sink.head[HttpResponse]): Future[HttpResponse
]
The problem is that I never send back my response only 500 with message
There was an internal server error :
with exception
java.util.NoSuchElementException: empty stream
at akka.stream.impl.HeadSink$HeadSinkSubscriber.onComplete(Sinks.scala:119)
at
akka.stream.impl.ReactiveStreamsCompliance$.tryOnComplete(ReactiveStreamsCompliance.scala:104)
at
akka.stream.impl.fusing.ActorGraphInterpreter$ActorOutputBoundary.akka$stream$impl$fusing$ActorGraphInterpreter$ActorOutputBoundary$$complete(ActorGraphInterpreter.scala:220)
at
akka.stream.impl.fusing.ActorGraphInterpreter$ActorOutputBoundary$$anon$2.onUpstreamFinish(ActorGraphInterpreter.scala:242)
at
akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:575)
at
akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:511)
at
akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$runBatch(ActorGraphInterpreter.scala:399)
at
akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:371)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at
akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:291)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke_aroundBody0(ActorCell.scala:487)
at akka.actor.ActorCell$AjcClosure1.run(ActorCell.scala:1)
at org.aspectj.runtime.reflect.JoinPointImpl.proceed(JoinPointImpl.java:149)
at
akka.kamon.instrumentation.ActorCellInstrumentation$$anonfun$aroundBehaviourInvoke$1.apply(ActorCellInstrumentation.scala:62)
at kamon.trace.Tracer$.withContext(TracerModule.scala:57)
at
akka.kamon.instrumentation.ActorCellInstrumentation.aroundBehaviourInvoke(ActorCellInstrumentation.scala:61)
at akka.actor.ActorCell.invoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
I dont' know how to change the flow behavior
Thanks for help
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.