Hi All,
I'm having some problem with the Framing.delimiter when it exceeds the
maximumFrameLength. In such case, I'm just expecting to fall into the
decider but instead I just get a nasty exception and the termination of my
stream processing.
class StreamToKafkaFlowBuilder(config: Config)(implicit val materializer:
akka.stream.Materializer) {
implicit val actorSystem = ActorSystem("ReactiveKafka")
private val kafka = new ReactiveKafka()
private val log = Logger.getLogger(this.getClass.getSimpleName)
private val bufferSize = 25000
private val source = Source.actorRef[ByteString](
bufferSize,
OverflowStrategy.dropHead
)
val decider: Supervision.Decider = {
case e: DataFormatException =>
log.log(Level.SEVERE, "Non GZIP format", e)
e.getStackTrace.foreach(println)
Supervision.resume
case e =>
FalconStatsD.increment("insideSupervisionDecider")
Logger.getLogger("some").log(Level.INFO, "INTO THE DECIDER")
e.getMessage
e.getStackTrace.foreach(println)
Supervision.resume
}
private val runnableByteStringProcessingFlow =
FlowGraph.closed(source) { implicit builder =>
byteStringSource =>
val tweetBroadCaster = builder.add(Broadcast[String](2))
val byteStringToTweetFlow = Flow[ByteString].
via(Framing.delimiter(
ByteString("\r\n"), maximumFrameLength = bufferSize,
allowTruncation = true))
.map(_.utf8String + "\r\n")
val tweetPrintSink = Sink.foreach[String] {
case "\r\n" => FalconStatsD.increment(
"keepAliveReceivedFromGnipCompliance")
case a => {
log.log(Level.INFO, a)
FalconStatsD.increment("activityReceivedFromGnipCompliance")
}
}
val producerProperties = ProducerProperties(
brokerList = config.getStringList("kafka.brokers").toList.mkString
(","),
topic = config.getString("kafka.topic"),
clientId = config.getString("kafka.clientId"),
encoder = new StringEncoder())
val kafkaSink = Sink(kafka.publish(producerProperties))
byteStringSource.outlet ~> Gzip.decoderFlow ~>
byteStringToTweetFlow ~> tweetBroadCaster.in
tweetBroadCaster.out(0) ~> kafkaSink
tweetBroadCaster.out(1) ~> tweetPrintSink
}
def build: ActorRef = runnableByteStringProcessingFlow.withAttributes(
ActorAttributes.supervisionStrategy(decider)).run()
}
[ERROR] [09/18/2015 10:31:24.636] [ReactiveKafka-akka.actor.default-
dispatcher-3] [akka://ReactiveKafka/user/$a] Read 6617 bytes which is more
than 180 without seeing a line terminator
akka.stream.io.Framing$FramingException: Read 6617 bytes which is more than
180 without seeing a line terminator
at akka.stream.io.Framing$DelimiterFramingStage.doParse(Framing.scala:
172)
at akka.stream.io.Framing$DelimiterFramingStage.onPush(Framing.scala:147
)
at akka.stream.io.Framing$DelimiterFramingStage.onPush(Framing.scala:138
)
at akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.run(Interpreter
.scala:436)
at akka.stream.impl.fusing.OneBoundedInterpreter$State$class.progress(
Interpreter.scala:245)
at akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.progress(
Interpreter.scala:434)
at akka.stream.impl.fusing.OneBoundedInterpreter.
akka$stream$impl$fusing$OneBoundedInterpreter$$execute(Interpreter.scala:580
)
at akka.stream.impl.fusing.OneBoundedInterpreter$State$class.execute(
Interpreter.scala:241)
at akka.stream.impl.fusing.OneBoundedInterpreter$EntryState.execute(
Interpreter.scala:666)
at akka.stream.stage.AbstractStage.enterAndPush(Stage.scala:66)
at akka.stream.impl.fusing.
BatchingActorInputBoundary$$anonfun$upstreamRunning$1.applyOrElse(
ActorInterpreter.scala:157)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.
scala:36)
at akka.stream.impl.SubReceive.apply(Transfer.scala:16)
at akka.stream.impl.SubReceive.apply(Transfer.scala:12)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.stream.impl.SubReceive.applyOrElse(Transfer.scala:12)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.stream.impl.fusing.ActorInterpreter.aroundReceive(
ActorInterpreter.scala:366)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:
1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
ForkJoinWorkerThread.java:107)
I've tried multiple things, from striping the flow in little pieces in
which I'd throw exceptions at different stages with some:
Flow[someType].map{x => {throw new Exception; x}}
I always end up inside the decider but when the problem is with this
FraminException thrown from Framing
Any ideas?
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.