Thanks Endre, On Friday, September 18, 2015 at 1:15:04 PM UTC+2, drewhk wrote: > > Hi Ivan, > > > On Fri, Sep 18, 2015 at 1:02 PM, Ivan Baisi <[email protected] > <javascript:>> wrote: > > > > On Friday, September 18, 2015 at 12:51:35 PM UTC+2, drewhk wrote: > > Hi Ivan, > > On Fri, Sep 18, 2015 at 12:48 PM, Ivan Baisi <[email protected]> > wrote: > > Hey Endre, > > Thanks for your reply. > > Well, I would've expected that I could catch that FramingException and > decide myself what to do instead of having my stream broken. There's > probably something I'm missing with the supervisor. So far I've been using > it inside my flows to deide on what to do with different exceptions within > my stream. Things that go from DB exceptions to Json parse exceptions > without any issue. My question is why doesn't this FramingException get > 'caught' in the decider as well. > > > The reason is that the DelimiterFraming has no support for supervision. > Not all stages support supervision because sometimes it is not possible to > implement them meaningfully. What would you expect the DelimiterFraming to > do after a line happens to be too large? > > > > I'm going into the akka streams documentation again, to check if I'm not > taking something into account. Is there any other resource that could be > useful? > > > I think I've seen something similar with this FramingException when > there's some exception with Gzip.decoderFlow due to some bad input. In this > case, I'd expect the same, being able to decide myself what to do instead > of having the stream terminated. > > > What do you mean by "decide what to do"? Can you give an example what you > can do with a malformed gzip stream or a too large line? > > > Drop the element and continue processing the next elements. If I get a too > large line, for example, I'd like to drop it and add a metric where later I > can see how many too long lines I'm getting. > > > You would map this behavior to the "Resume" directive? That might be > possible, but I don't really like cramming all permutations of possible > operations into the Akka Streams library (the similar Decoder in Netty also > just fails on large frames). > > Btw where would the metric go? Should the framing materialize to some > counter? > > Currently, you can take the Stage implementation and add the skipping > behavior yourself: > https://github.com/akka/akka/blob/release-2.3-dev/akka-stream/src/main/scala/akka/stream/io/Framing.scala#L138 >
Yes, this is one of the things I tried and worked. I was just wondering if I was missing something that didn't let me use the akka Framing and then drop the element when encountering an exception so I could add the metric into de decider and do a simple Supervision.resume. Thanks :) > > -Endre > > > > > > -Endre > > > > Cheers! > > On Friday, September 18, 2015 at 11:29:54 AM UTC+2, drewhk wrote: > > Hi Ivan, > > Supervision is not something that automatically works on any stage, > unfortunately, it always needs custom work, and there are cases where it > can't work at all (not even theoretically). I am not sure what should a > delimiter framing do when a line-size has been exceeded for example? What > do you expect to happen? > > -Endre > > On Fri, Sep 18, 2015 at 10:36 AM, Ivan Baisi <[email protected]> > wrote: > > Hi All, > > I'm having some problem with the Framing.delimiter when it exceeds the > maximumFrameLength. In such case, I'm just expecting to fall into the > decider but instead I just get a nasty exception and the termination of my > stream processing. > > class StreamToKafkaFlowBuilder(config: Config)(implicit val materializer: > akka.stream.Materializer) { > implicit val actorSystem = ActorSystem("ReactiveKafka") > private val kafka = new ReactiveKafka() > private val log = Logger.getLogger(this.getClass.getSimpleName) > > private val bufferSize = 25000 > private val source = Source.actorRef[ByteString]( > bufferSize, > OverflowStrategy.dropHead > ) > > val decider: Supervision.Decider = { > case e: DataFormatException => > log.log(Level.SEVERE, "Non GZIP format", e) > e.getStackTrace.foreach(println) > Supervision.resume > case e => > FalconStatsD.increment("insideSupervisionDecider") > Logger.getLogger("some").log(Level.INFO, "INTO THE DECIDER") > e.getMessage > e.getStackTrace.foreach(println) > Supervision.resume > } > > private val runnableByteStringProcessingFlow = > FlowGraph.closed(source) { implicit builder => > byteStringSource => > val tweetBroadCaster = builder.add(Broadcast[String](2)) > > val byteStringToTweetFlow = Flow[ByteString]. > via(Framing.delimiter( > ByteString("\r\n"), maximumFrameLength = bufferSize, > allowTruncation = true)) > .map(_.utf8String + "\r\n") > > val tweetPrintSink = Sink.foreach[String] { > case "\r\n" => FalconStatsD.increment( > "keepAliveReceivedFromGnipCompliance") > case a => { > log.log(Level.INFO, a) > FalconStatsD.increment("activityReceivedFromGnipCompliance") > } > } > > val producerProperties = ProducerProperties( > brokerList = config.getStringList("kafka.brokers").toList. > mkString(","), > topic = config.getString("kafka.topic"), > clientId = config.getString("kafka.clientId"), > encoder = new StringEncoder()) > > val kafkaSink = Sink(kafka.publish(producerProperties)) > > byteStringSource.outlet ~> Gzip.decoderFlow ~> > byteStringToTweetFlow ~> tweetBroadCaster.in > tweetBroadCaster.out(0) ~> kafkaSink > tweetBroadCaster.out(1) ~> tweetPrintSink > } > > def build: ActorRef = runnableByteStringProcessingFlow.withAttributes( > ActorAttributes.supervisionStrategy(decider)).run() > } > > [ERROR] [09/18/2015 10:31:24.636] [ReactiveKafka-akka.actor.default- > dispatcher-3] [akka://ReactiveKafka/user/$a] Read 6617 bytes which is > more than 180 without seeing a line terminator > akka.stream.io.Framing$FramingException: Read 6617 bytes which is more > than 180 without seeing a line terminator > at akka.stream.io.Framing$DelimiterFramingStage.doParse(Framing.scala: > 172) > at akka.stream.io.Framing$DelimiterFramingStage.onPush(Framing.scala: > 147) > at akka.stream.io.Framing$DelimiterFramingStage.onPush(Framing.scala: > 138) > at akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.run( > Interpreter.scala:436) > at akka.stream.impl.fusing.OneBoundedInterpreter$State$class.progress( > Interpreter.scala:245) > at akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.progress( > Interpreter.scala:434) > at akka.stream.impl.fusing.OneBoundedInterpreter. > akka$stream$impl$fusing$OneBoundedInterpreter$$execute(Interpreter.scala: > 580) > at akka.stream.impl.fusing.OneBoundedInterpreter$State$class.execute( > Interpreter.scala:241) > at akka.stream.impl.fusing.OneBoundedInterpreter$EntryState.execute( > Interpreter.scala:666) > at akka.stream.stage.AbstractStage.enterAndPush(Stage.scala:66) > at akka.stream.impl.fusing. > BatchingActorInputBoundary$$anonfun$upstreamRunning$1.applyOrElse( > ActorInterpreter.scala:157) > at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction > .scala:36) > at akka.stream.impl.SubReceive.apply(Transfer.scala:16) > at akka.stream.impl.SubReceive.apply(Transfer.scala:12) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.stream.impl.SubReceive.applyOrElse(Transfer.scala:12) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka > > ... -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
