Hi (I apologize in advance for all the screenshots, just the easiest way to relay information here)
To be more precise, the problem appears to be happening inside the removed experimental Pipeline code. I understand this was removed and isn't supported but just putting this out there in hope that perhaps someone else ran into this and came up with a solution or can think of one. In short I believe some (corrupted?) data is causing LengthFieldFrame.extractFrames to recurse indefinitely, causing the heap to be filled with scala.collection.immutable.$colon$colon instances (~79 million on my 2GB heap). I was finally able to catch this happening while monitoring with jvisualvm: http://i.imgur.com/mzWwZAN.png As you can see this happens very suddenly. Inspecting the heap dump in MAT shows the very helpful dominator tree: http://i.imgur.com/Sza07rL.png It shows the usage is confined to 1 thread. I was able to confirm in my logging that this thread completed some work successfully right before the problem started (time of problem judged by the gc logging) and was never seen again (endlessly recursing). This very large List contains only the *same empty* ByteString (spot checking in the heap dump in jvisualvm). I was also able to get 2 thread dumps while this was happening: "CollectorActorSystem-akka.actor.default-dispatcher-3" prio=10 tid=0x0000000001d44800 nid=0x17d5 runnable [0x00007fd770f0b000] java.lang.Thread.State: RUNNABLE at akka.util.ByteIterator$ByteArrayIterator$.apply(ByteIterator.scala:25) at akka.util.ByteString$ByteString1.iterator(ByteString.scala:138) at akka.util.ByteString$ByteString1.iterator(ByteString.scala:131) at akka.io.LengthFieldFrame$$anon$3.akka$io$LengthFieldFrame$$anon$$extractFrames(Pipelines.scala:986) at akka.io.LengthFieldFrame$$anon$3$$anonfun$eventPipeline$1.apply(Pipelines.scala:1026) at akka.io.LengthFieldFrame$$anon$3$$anonfun$eventPipeline$1.apply(Pipelines.scala:1024) at akka.io.PipelineStage$$anon$7$$anon$8$$anonfun$14.apply(Pipelines.scala:680) at akka.io.PipelineStage$$anon$7$$anon$8$$anonfun$14.apply(Pipelines.scala:680) at akka.io.PipelineStage$$anon$7$$anon$8.akka$io$PipelineStage$$anon$$anon$$loopRight(Pipelines.scala:667) at akka.io.PipelineStage$$anon$7$$anon$8$$anonfun$14.apply(Pipelines.scala:680) at akka.io.PipelineStage$$anon$7$$anon$8$$anonfun$14.apply(Pipelines.scala:680) at akka.io.PipelineFactory$$anon$6$$anonfun$12.apply(Pipelines.scala:336) at akka.io.PipelineFactory$$anon$6$$anonfun$12.apply(Pipelines.scala:336) at scala.util.Try$.apply(Try.scala:161) at akka.io.PipelineFactory$$anon$6.injectEvent(Pipelines.scala:336) at akka.io.TcpPipelineHandler$$anonfun$receive$1.applyOrElse(TcpPipelineHandler.scala:152) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.io.TcpPipelineHandler.aroundReceive(TcpPipelineHandler.scala:121) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) and "CollectorActorSystem-akka.actor.default-dispatcher-3" prio=10 tid=0x0000000001d44800 nid=0x17d5 waiting on condition [0x00007fd770f0b000] java.lang.Thread.State: RUNNABLE at akka.util.ByteIterator.getLongPart(ByteIterator.scala:532) at akka.io.LengthFieldFrame$$anon$3.akka$io$LengthFieldFrame$$anon$$extractFrames(Pipelines.scala:986) at akka.io.LengthFieldFrame$$anon$3$$anonfun$eventPipeline$1.apply(Pipelines.scala:1026) at akka.io.LengthFieldFrame$$anon$3$$anonfun$eventPipeline$1.apply(Pipelines.scala:1024) at akka.io.PipelineStage$$anon$7$$anon$8$$anonfun$14.apply(Pipelines.scala:680) at akka.io.PipelineStage$$anon$7$$anon$8$$anonfun$14.apply(Pipelines.scala:680) at akka.io.PipelineStage$$anon$7$$anon$8.akka$io$PipelineStage$$anon$$anon$$loopRight(Pipelines.scala:667) at akka.io.PipelineStage$$anon$7$$anon$8$$anonfun$14.apply(Pipelines.scala:680) at akka.io.PipelineStage$$anon$7$$anon$8$$anonfun$14.apply(Pipelines.scala:680) at akka.io.PipelineFactory$$anon$6$$anonfun$12.apply(Pipelines.scala:336) at akka.io.PipelineFactory$$anon$6$$anonfun$12.apply(Pipelines.scala:336) at scala.util.Try$.apply(Try.scala:161) at akka.io.PipelineFactory$$anon$6.injectEvent(Pipelines.scala:336) at akka.io.TcpPipelineHandler$$anonfun$receive$1.applyOrElse(TcpPipelineHandler.scala:152) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.io.TcpPipelineHandler.aroundReceive(TcpPipelineHandler.scala:121) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Here is the relevant code: @tailrec def extractFrames(bs: ByteString, acc: List[ByteString]) // : (Option[ByteString], Seq[ByteString]) = { if (bs.isEmpty) { (None, acc) } else if (bs.length < headerSize) { (Some(bs.compact), acc) } else { val length = bs.iterator.getLongPart(headerSize).toInt if (length < 0 || length > maxSize) throw new IllegalArgumentException( s"received too large frame of size $length (max = $maxSize)") val total = if (lengthIncludesHeader) length else length + headerSize if (bs.length >= total) { extractFrames(bs drop total, bs.slice(headerSize, total) :: acc) } else { (Some(bs.compact), acc) } } } My maxSize is set to 10000. Looking further at the only instance of ByteIterator in that thread I see: http://i.imgur.com/G6Yz7at.png And looking into the referenced array I see: http://i.imgur.com/EYLPhv6.png Could it be that the first 4 bytes (197, 198, 199, 200) is where it's looking for the header and in this case it is (incorrectly) set to 0? The extractFrames function doesn't appear to cater to this unusual situation, I think bs.slice(4,0) would return an empty ByteString? If this is the case, it could explain what I'm seeing here. Does my assessment seem reasonable? Does anyone have any other suggestions/thoughts? Akka Team, I'm a bit in a rock and a hard place here, I haven't looked at the forthcoming Streams yet, does it have the functionality to build length field frame encoded based protocol services like this yet? How far off is a stable release? My preference would be to fix Pipelines if possible and then move to Streams when it's ready & appropriate. As a very last option I could look at moving this to Netty but I'd rather not tackle that challenge if avoidable (would have to learn from scratch). Many thanks Peter -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
