Hi

(I apologize in advance for all the screenshots, just the easiest way to 
relay information here)

To be more precise, the problem appears to be happening inside the removed 
experimental Pipeline code. I understand this was removed and isn't 
supported but just putting this out there in hope that perhaps someone else 
ran into this and came up with a solution or can think of one. 

In short I believe some (corrupted?) data is 
causing LengthFieldFrame.extractFrames to recurse indefinitely, causing the 
heap to be filled with scala.collection.immutable.$colon$colon instances 
(~79 million on my 2GB heap). I was finally able to catch this happening 
while monitoring with jvisualvm:

http://i.imgur.com/mzWwZAN.png

As you can see this happens very suddenly. Inspecting the heap dump in MAT 
shows the very helpful dominator tree:

http://i.imgur.com/Sza07rL.png

It shows the usage is confined to 1 thread. I was able to confirm in my 
logging that this thread completed some work successfully right before the 
problem started (time of problem judged by the gc logging) and was never 
seen again (endlessly recursing). This very large List contains only the 
*same empty* ByteString (spot checking in the heap dump in jvisualvm). 

I was also able to get 2 thread dumps while this was happening:

"CollectorActorSystem-akka.actor.default-dispatcher-3" prio=10 
tid=0x0000000001d44800 nid=0x17d5 runnable [0x00007fd770f0b000]
   java.lang.Thread.State: RUNNABLE
        at 
akka.util.ByteIterator$ByteArrayIterator$.apply(ByteIterator.scala:25)
        at akka.util.ByteString$ByteString1.iterator(ByteString.scala:138)
        at akka.util.ByteString$ByteString1.iterator(ByteString.scala:131)
        at 
akka.io.LengthFieldFrame$$anon$3.akka$io$LengthFieldFrame$$anon$$extractFrames(Pipelines.scala:986)
        at 
akka.io.LengthFieldFrame$$anon$3$$anonfun$eventPipeline$1.apply(Pipelines.scala:1026)
        at 
akka.io.LengthFieldFrame$$anon$3$$anonfun$eventPipeline$1.apply(Pipelines.scala:1024)
        at 
akka.io.PipelineStage$$anon$7$$anon$8$$anonfun$14.apply(Pipelines.scala:680)
        at 
akka.io.PipelineStage$$anon$7$$anon$8$$anonfun$14.apply(Pipelines.scala:680)
        at 
akka.io.PipelineStage$$anon$7$$anon$8.akka$io$PipelineStage$$anon$$anon$$loopRight(Pipelines.scala:667)
        at 
akka.io.PipelineStage$$anon$7$$anon$8$$anonfun$14.apply(Pipelines.scala:680)
        at 
akka.io.PipelineStage$$anon$7$$anon$8$$anonfun$14.apply(Pipelines.scala:680)
        at 
akka.io.PipelineFactory$$anon$6$$anonfun$12.apply(Pipelines.scala:336)
        at 
akka.io.PipelineFactory$$anon$6$$anonfun$12.apply(Pipelines.scala:336)
        at scala.util.Try$.apply(Try.scala:161)
        at akka.io.PipelineFactory$$anon$6.injectEvent(Pipelines.scala:336)
        at 
akka.io.TcpPipelineHandler$$anonfun$receive$1.applyOrElse(TcpPipelineHandler.scala:152)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at 
akka.io.TcpPipelineHandler.aroundReceive(TcpPipelineHandler.scala:121)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

and

"CollectorActorSystem-akka.actor.default-dispatcher-3" prio=10 
tid=0x0000000001d44800 nid=0x17d5 waiting on condition [0x00007fd770f0b000]
   java.lang.Thread.State: RUNNABLE
        at akka.util.ByteIterator.getLongPart(ByteIterator.scala:532)
        at 
akka.io.LengthFieldFrame$$anon$3.akka$io$LengthFieldFrame$$anon$$extractFrames(Pipelines.scala:986)
        at 
akka.io.LengthFieldFrame$$anon$3$$anonfun$eventPipeline$1.apply(Pipelines.scala:1026)
        at 
akka.io.LengthFieldFrame$$anon$3$$anonfun$eventPipeline$1.apply(Pipelines.scala:1024)
        at 
akka.io.PipelineStage$$anon$7$$anon$8$$anonfun$14.apply(Pipelines.scala:680)
        at 
akka.io.PipelineStage$$anon$7$$anon$8$$anonfun$14.apply(Pipelines.scala:680)
        at 
akka.io.PipelineStage$$anon$7$$anon$8.akka$io$PipelineStage$$anon$$anon$$loopRight(Pipelines.scala:667)
        at 
akka.io.PipelineStage$$anon$7$$anon$8$$anonfun$14.apply(Pipelines.scala:680)
        at 
akka.io.PipelineStage$$anon$7$$anon$8$$anonfun$14.apply(Pipelines.scala:680)
        at 
akka.io.PipelineFactory$$anon$6$$anonfun$12.apply(Pipelines.scala:336)
        at 
akka.io.PipelineFactory$$anon$6$$anonfun$12.apply(Pipelines.scala:336)
        at scala.util.Try$.apply(Try.scala:161)
        at akka.io.PipelineFactory$$anon$6.injectEvent(Pipelines.scala:336)
        at 
akka.io.TcpPipelineHandler$$anonfun$receive$1.applyOrElse(TcpPipelineHandler.scala:152)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at 
akka.io.TcpPipelineHandler.aroundReceive(TcpPipelineHandler.scala:121)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Here is the relevant code:

      @tailrec
      def extractFrames(bs: ByteString, acc: List[ByteString]) //
      : (Option[ByteString], Seq[ByteString]) = {
        if (bs.isEmpty) {
          (None, acc)
        } else if (bs.length < headerSize) {
          (Some(bs.compact), acc)
        } else {
          val length = bs.iterator.getLongPart(headerSize).toInt
          if (length < 0 || length > maxSize)
            throw new IllegalArgumentException(
              s"received too large frame of size $length (max = $maxSize)")
          val total = if (lengthIncludesHeader) length else length + 
headerSize
          if (bs.length >= total) {
            extractFrames(bs drop total, bs.slice(headerSize, total) :: acc)
          } else {
            (Some(bs.compact), acc)
          }
        }
      }

My maxSize is set to 10000. 

Looking further at the only instance of ByteIterator in that thread I see:

http://i.imgur.com/G6Yz7at.png

And looking into the referenced array I see: 

http://i.imgur.com/EYLPhv6.png

Could it be that the first 4 bytes (197, 198, 199, 200) is where it's 
looking for the header and in this case it is (incorrectly) set to 0? The 
extractFrames function doesn't appear to cater to this unusual situation, I 
think bs.slice(4,0) would return an empty ByteString? If this is the case, 
it could explain what I'm seeing here. 

Does my assessment seem reasonable? Does anyone have any other 
suggestions/thoughts?

Akka Team, I'm a bit in a rock and a hard place here, I haven't looked at 
the forthcoming Streams yet, does it have the functionality to build length 
field frame encoded based protocol services like this yet? How far off is a 
stable release? My preference would be to fix Pipelines if possible and 
then move to Streams when it's ready & appropriate. As a very last option I 
could look at moving this to Netty but I'd rather not tackle that challenge 
if avoidable (would have to learn from scratch). 

Many thanks
Peter

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to