This part of Register is also relevant
* @param useResumeWriting If this is set to true then the connection
actor
* will refuse all further writes after issuing a
[[CommandFailed]]
* notification until [[ResumeWriting]] is received. This
can
* be used to implement NACK-based write backpressure.
this implies that if useResumeWriting=false then further writes after a
CommandFailed *will* be successful when the backlog is cleared. Is this
correct?
On Monday, 6 July 2015 11:18:13 UTC+1, Sam Halliday wrote:
>
> Aha! Think I got it.
>
> I need to send ResumeWriting and listen for a WritingResumed, to recover
> from such a situation.
>
> On Monday, 6 July 2015 11:01:23 UTC+1, Sam Halliday wrote:
>>
>> I should say, that in every example I have tried... with payloads up to
>> 30MB for the initial un-acked message, it always arrives at the
>> destination. The subsequent, and retried, but the connection is completely
>> dead and the Ack-ed message can never be sent.
>>
>> On Monday, 6 July 2015 10:59:38 UTC+1, Sam Halliday wrote:
>>>
>>> Dear all,
>>>
>>> I had a few questions for this list last week regarding an unrecoverable
>>> error condition that I was seeing in Wandoulabs WebSockets.
>>>
>>> I have now isolated the problem to standard Scala 2.11.7 / Akka 2.3.11
>>> Tcp.Write with the minimal example at the bottom of this email.
>>>
>>> It seems that sending a ByteString of a moderate size can basically nuke
>>> the network connection.
>>>
>>>
>>> I am very concerned that such unrecoverable errors are possible
>>> (reconnecting would potentially allow sending the failed message, but let's
>>> not consider that a solution).
>>>
>>> What is even more concerning is that I have seen related problems in my
>>> integration tests, where I am using Acking with backpressure everywhere,
>>> but I have been unable to get a reliable reproduction of the problem. Using
>>> Acking seems to mitigate the problem somewhat, but obviously not enough.
>>>
>>> Can somebody please have a look at this and let me know if it is a bug
>>> or if there is some part of the Tcp.Write spec that I failed to grok. Also,
>>> confirming if the problem exists on some network other than mine would be a
>>> good data point. My corporate environment uses PEAP
>>>
>>>
>>> Best regards,
>>> Sam
>>>
>>>
>>> package testing
>>>
>>> import akka.actor._
>>> import akka.event.LoggingReceive
>>> import akka.io.{ IO, Tcp }
>>> import akka.util.ByteString
>>> import java.net.InetSocketAddress
>>> import java.util.UUID
>>> import concurrent.duration._
>>>
>>> /**
>>> * This is a test of Akka IO to see if the WebSocket behaviour
>>> * described in Buggy is a TCP problem or limited to the WebSocket
>>> * implementation.
>>> *
>>> * Run a blackhole on the target machine, e.g.
>>> *
>>> * nc -k -l 2222 >/dev/null
>>> *
>>> * For a single session, to confirm transmission of payloads:
>>> *
>>> * nc -l 2222 > blackhole
>>> *
>>> * run-main testing.BuggyTcp
>>> *
>>> */
>>> object BuggyTcp extends App {
>>> implicit val system = ActorSystem()
>>>
>>> val remote = new InetSocketAddress("remote-hostname-here", 2222)
>>>
>>> system.actorOf(Props(classOf[BuggyTcp], remote), "client")
>>> }
>>>
>>> class BuggyTcp(remote: InetSocketAddress) extends Actor with
>>> ActorLogging {
>>>
>>> import Tcp._
>>> import context.system
>>>
>>> override def preStart(): Unit = {
>>> IO(Tcp) ! Connect(remote)
>>> }
>>>
>>> var connection: ActorRef = _
>>>
>>> object Ack extends Tcp.Event with spray.io.Droppable {
>>> override def toString = "Ack"
>>> }
>>>
>>> def receive = {
>>> case CommandFailed([email protected](bytes, ack)) =>
>>> log.error(s"failed to write ${ack}")
>>>
>>> // perpetually retry ... does it ever correct itself?
>>> import context.dispatcher
>>> context.system.scheduler.scheduleOnce(1 second, connection, write)
>>>
>>> case c: Connected =>
>>> connection = sender()
>>> connection ! Register(self)
>>> log.info("sending")
>>>
>>> // works
>>> //connection ! Tcp.Write(ByteString("A" * 30000), NoAck("Big
>>> thing"))
>>> //connection ! Tcp.Write(ByteString(UUID.randomUUID.toString), Ack)
>>>
>>> // never recovers
>>> connection ! Tcp.Write(ByteString("A" * 300000), NoAck("Big
>>> thing"))
>>> connection ! Tcp.Write(ByteString(UUID.randomUUID.toString), Ack)
>>>
>>> case Ack =>
>>> system.shutdown()
>>> case _: ConnectionClosed =>
>>> system.shutdown()
>>>
>>> case msg =>
>>> // WORKAROUND https://github.com/akka/akka/issues/17898
>>> // (can't use LoggingReceive)
>>> log.info(s"got a ${msg.getClass.getName}")
>>>
>>> }
>>>
>>> }
>>>
>>>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.