On Friday, September 18, 2015 at 12:51:35 PM UTC+2, drewhk wrote:
>
> Hi Ivan,
>
> On Fri, Sep 18, 2015 at 12:48 PM, Ivan Baisi <[email protected] 
> <javascript:>> wrote:
>
> Hey Endre,
>
> Thanks for your reply.
>
> Well, I would've expected that I could catch that FramingException and 
> decide myself what to do instead of having my stream broken. There's 
> probably something I'm missing with the supervisor. So far I've been using 
> it inside my flows to deide on what to do with different exceptions within 
> my stream. Things that go from DB exceptions to Json parse exceptions 
> without any issue. My question is why doesn't this FramingException get 
> 'caught' in the decider as well.
>
>
> The reason is that the DelimiterFraming has no support for supervision. 
> Not all stages support supervision because sometimes it is not possible to 
> implement them meaningfully. What would you expect the DelimiterFraming to 
> do after a line happens to be too large?
>  
>
>
> I'm going into the akka streams documentation again, to check if I'm not 
> taking something into account. Is there any other resource that could be 
> useful? 
>
>
> I think I've seen something similar with this FramingException when 
> there's some exception with Gzip.decoderFlow due to some bad input. In this 
> case, I'd expect the same, being able to decide myself what to do instead 
> of having the stream terminated.
>
>
> What do you mean by "decide what to do"? Can you give an example what you 
> can do with a malformed gzip stream or a too large line?
>

Drop the element and continue processing the next elements. If I get a too 
large line, for example, I'd like to drop it and add a metric where later I 
can see how many too long lines I'm getting.
 

>
> -Endre
>  
>
>
> Cheers!
>
> On Friday, September 18, 2015 at 11:29:54 AM UTC+2, drewhk wrote:
>
> Hi Ivan,
>
> Supervision is not something that automatically works on any stage, 
> unfortunately, it always needs custom work, and there are cases where it 
> can't work at all (not even theoretically). I am not sure what should a 
> delimiter framing do when a line-size has been exceeded for example? What 
> do you expect to happen?
>
> -Endre
>
> On Fri, Sep 18, 2015 at 10:36 AM, Ivan Baisi <[email protected]> 
> wrote:
>
> Hi All,
>
> I'm having some problem with the Framing.delimiter when it exceeds the 
> maximumFrameLength. In such case, I'm just expecting to fall into the 
> decider but instead I just get a nasty exception and the termination of my 
> stream processing.
>
> class StreamToKafkaFlowBuilder(config: Config)(implicit val materializer: 
> akka.stream.Materializer) {
>   implicit val actorSystem = ActorSystem("ReactiveKafka")
>   private val kafka = new ReactiveKafka()
>   private val log = Logger.getLogger(this.getClass.getSimpleName)
>
>   private val bufferSize = 25000
>   private val source = Source.actorRef[ByteString](
>     bufferSize,
>     OverflowStrategy.dropHead
>   )
>
>   val decider: Supervision.Decider = {
>     case e: DataFormatException =>
>       log.log(Level.SEVERE, "Non GZIP format", e)
>       e.getStackTrace.foreach(println)
>       Supervision.resume
>     case e =>
>       FalconStatsD.increment("insideSupervisionDecider")
>       Logger.getLogger("some").log(Level.INFO, "INTO THE DECIDER")
>       e.getMessage
>       e.getStackTrace.foreach(println)
>       Supervision.resume
>   }
>
>   private val runnableByteStringProcessingFlow =
>     FlowGraph.closed(source) { implicit builder =>
>       byteStringSource =>
>         val tweetBroadCaster = builder.add(Broadcast[String](2))
>
>         val byteStringToTweetFlow = Flow[ByteString].
>           via(Framing.delimiter(
>             ByteString("\r\n"), maximumFrameLength = bufferSize, 
> allowTruncation = true))
>           .map(_.utf8String + "\r\n")
>
>         val tweetPrintSink = Sink.foreach[String] {
>           case "\r\n" => FalconStatsD.increment(
> "keepAliveReceivedFromGnipCompliance")
>           case a => {
>             log.log(Level.INFO, a)
>             FalconStatsD.increment("activityReceivedFromGnipCompliance")
>           }
>         }
>
>         val producerProperties = ProducerProperties(
>           brokerList = config.getStringList("kafka.brokers").toList.
> mkString(","),
>           topic = config.getString("kafka.topic"),
>           clientId = config.getString("kafka.clientId"),
>           encoder = new StringEncoder())
>
>         val kafkaSink = Sink(kafka.publish(producerProperties))
>
>         byteStringSource.outlet ~> Gzip.decoderFlow ~> 
> byteStringToTweetFlow ~> tweetBroadCaster.in
>         tweetBroadCaster.out(0) ~> kafkaSink
>         tweetBroadCaster.out(1) ~> tweetPrintSink
>     }
>
>   def build: ActorRef = runnableByteStringProcessingFlow.withAttributes(
> ActorAttributes.supervisionStrategy(decider)).run()
> }
>
> [ERROR] [09/18/2015 10:31:24.636] [ReactiveKafka-akka.actor.default-
> dispatcher-3] [akka://ReactiveKafka/user/$a] Read 6617 bytes which is 
> more than 180 without seeing a line terminator
> akka.stream.io.Framing$FramingException: Read 6617 bytes which is more 
> than 180 without seeing a line terminator
>     at akka.stream.io.Framing$DelimiterFramingStage.doParse(Framing.scala:
> 172)
>     at akka.stream.io.Framing$DelimiterFramingStage.onPush(Framing.scala:
> 147)
>     at akka.stream.io.Framing$DelimiterFramingStage.onPush(Framing.scala:
> 138)
>     at akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.run(
> Interpreter.scala:436)
>     at akka.stream.impl.fusing.OneBoundedInterpreter$State$class.progress(
> Interpreter.scala:245)
>     at akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.progress(
> Interpreter.scala:434)
>     at akka.stream.impl.fusing.OneBoundedInterpreter.
> akka$stream$impl$fusing$OneBoundedInterpreter$$execute(Interpreter.scala:
> 580)
>     at akka.stream.impl.fusing.OneBoundedInterpreter$State$class.execute(
> Interpreter.scala:241)
>     at akka.stream.impl.fusing.OneBoundedInterpreter$EntryState.execute(
> Interpreter.scala:666)
>     at akka.stream.stage.AbstractStage.enterAndPush(Stage.scala:66)
>     at akka.stream.impl.fusing.
> BatchingActorInputBoundary$$anonfun$upstreamRunning$1.applyOrElse(
> ActorInterpreter.scala:157)
>     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction
> .scala:36)
>     at akka.stream.impl.SubReceive.apply(Transfer.scala:16)
>     at akka.stream.impl.SubReceive.apply(Transfer.scala:12)
>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>     at akka.stream.impl.SubReceive.applyOrElse(Transfer.scala:12)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>     at akka.stream.impl.fusing.ActorInterpreter.aroundReceive(
> ActorInterpreter.scala:366)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> AbstractDispatcher.scala:397)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260
> )
>     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
> ForkJoinPool.java:1339)
>     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:
> 1979)
>     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> I've tried multiple things, from striping the flow in little pieces in 
> which I'd throw exceptions at different stages with some:
>
> Flow[someType].map{x => {throw new Exception; x}}
>
>
> I always end up inside the decider but when the problem is with this 
> FraminException thrown from Framing
>
> Any ideas?
>
> -- 
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>
> -- 
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>
>
> ...

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to