Hi All,

I'm having some problem with the Framing.delimiter when it exceeds the 
maximumFrameLength. In such case, I'm just expecting to fall into the 
decider but instead I just get a nasty exception and the termination of my 
stream processing.

class StreamToKafkaFlowBuilder(config: Config)(implicit val materializer: 
akka.stream.Materializer) {
  implicit val actorSystem = ActorSystem("ReactiveKafka")
  private val kafka = new ReactiveKafka()
  private val log = Logger.getLogger(this.getClass.getSimpleName)

  private val bufferSize = 25000
  private val source = Source.actorRef[ByteString](
    bufferSize,
    OverflowStrategy.dropHead
  )

  val decider: Supervision.Decider = {
    case e: DataFormatException =>
      log.log(Level.SEVERE, "Non GZIP format", e)
      e.getStackTrace.foreach(println)
      Supervision.resume
    case e =>
      FalconStatsD.increment("insideSupervisionDecider")
      Logger.getLogger("some").log(Level.INFO, "INTO THE DECIDER")
      e.getMessage
      e.getStackTrace.foreach(println)
      Supervision.resume
  }

  private val runnableByteStringProcessingFlow =
    FlowGraph.closed(source) { implicit builder =>
      byteStringSource =>
        val tweetBroadCaster = builder.add(Broadcast[String](2))

        val byteStringToTweetFlow = Flow[ByteString].
          via(Framing.delimiter(
            ByteString("\r\n"), maximumFrameLength = bufferSize, 
allowTruncation = true))
          .map(_.utf8String + "\r\n")

        val tweetPrintSink = Sink.foreach[String] {
          case "\r\n" => FalconStatsD.increment(
"keepAliveReceivedFromGnipCompliance")
          case a => {
            log.log(Level.INFO, a)
            FalconStatsD.increment("activityReceivedFromGnipCompliance")
          }
        }

        val producerProperties = ProducerProperties(
          brokerList = config.getStringList("kafka.brokers").toList.mkString
(","),
          topic = config.getString("kafka.topic"),
          clientId = config.getString("kafka.clientId"),
          encoder = new StringEncoder())

        val kafkaSink = Sink(kafka.publish(producerProperties))

        byteStringSource.outlet ~> Gzip.decoderFlow ~> 
byteStringToTweetFlow ~> tweetBroadCaster.in
        tweetBroadCaster.out(0) ~> kafkaSink
        tweetBroadCaster.out(1) ~> tweetPrintSink
    }

  def build: ActorRef = runnableByteStringProcessingFlow.withAttributes(
ActorAttributes.supervisionStrategy(decider)).run()
}

[ERROR] [09/18/2015 10:31:24.636] [ReactiveKafka-akka.actor.default-
dispatcher-3] [akka://ReactiveKafka/user/$a] Read 6617 bytes which is more 
than 180 without seeing a line terminator
akka.stream.io.Framing$FramingException: Read 6617 bytes which is more than 
180 without seeing a line terminator
    at akka.stream.io.Framing$DelimiterFramingStage.doParse(Framing.scala:
172)
    at akka.stream.io.Framing$DelimiterFramingStage.onPush(Framing.scala:147
)
    at akka.stream.io.Framing$DelimiterFramingStage.onPush(Framing.scala:138
)
    at akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.run(Interpreter
.scala:436)
    at akka.stream.impl.fusing.OneBoundedInterpreter$State$class.progress(
Interpreter.scala:245)
    at akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.progress(
Interpreter.scala:434)
    at akka.stream.impl.fusing.OneBoundedInterpreter.
akka$stream$impl$fusing$OneBoundedInterpreter$$execute(Interpreter.scala:580
)
    at akka.stream.impl.fusing.OneBoundedInterpreter$State$class.execute(
Interpreter.scala:241)
    at akka.stream.impl.fusing.OneBoundedInterpreter$EntryState.execute(
Interpreter.scala:666)
    at akka.stream.stage.AbstractStage.enterAndPush(Stage.scala:66)
    at akka.stream.impl.fusing.
BatchingActorInputBoundary$$anonfun$upstreamRunning$1.applyOrElse(
ActorInterpreter.scala:157)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.
scala:36)
    at akka.stream.impl.SubReceive.apply(Transfer.scala:16)
    at akka.stream.impl.SubReceive.apply(Transfer.scala:12)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.stream.impl.SubReceive.applyOrElse(Transfer.scala:12)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
    at akka.stream.impl.fusing.ActorInterpreter.aroundReceive(
ActorInterpreter.scala:366)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:
1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
ForkJoinWorkerThread.java:107)

I've tried multiple things, from striping the flow in little pieces in 
which I'd throw exceptions at different stages with some:

Flow[someType].map{x => {throw new Exception; x}}


I always end up inside the decider but when the problem is with this 
FraminException thrown from Framing

Any ideas?

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to