Hi William,
What is the error that you are seeing?
I can't reproduce it with latest streams code. I tried your sample with 1
to 500 clients in the tight loop. It only prints Success.
By the way, the counter would be better as
val connectionCounter = new AtomicInteger
val handler = Sink.foreach[Tcp.IncomingConnection] { conn ⇒
val c = connectionCounter.incrementAndGet()
/Patrik
On Wed, Jun 10, 2015 at 8:18 PM, William Le Ferrand <[email protected]>
wrote:
> Hi,
>
> I modified the TcpEcho example to have the client run several queries in a
> tight loop & you can see them fail. With a Thread.sleep(1s) in the client
> loop, queries run perfectly.
>
> The demo is here:
> https://gist.github.com/williamleferrand/3478d9ed00c7b411dada
>
> Does anyone would have hints toward a fix? I dug into the akka code but
> couldn't figure out what is going on.
>
> Many thanks!!
>
> William
>
> On Wed, Jun 10, 2015 at 7:26 AM, William Le Ferrand <[email protected]>
> wrote:
>
>> Hi Peter
>>
>> Thanks for the pointer but it doesn't seem to be what's happening. I
>> occasionally get two full queries from the same connection on the server
>> side; they are properly decoded & processed and are equal to what the
>> client sent in two different calls to askServer()
>>
>> Today I'll try to isolate that behavior a little bit more.
>>
>> Thanks
>>
>> william
>>
>> On Tue, Jun 9, 2015 at 9:31 PM, Peter Swift <[email protected]>
>> wrote:
>>
>>> I bet your request is occasionally fragmented on the tcp layer. Here's
>>> some background
>>> http://blog.stephencleary.com/2009/04/message-framing.html.
>>> There is a recipe in the bidi flow graph section for length-based
>>> framing
>>> http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-graphs.html
>>> .
>>>
>>> Cheers,
>>> Peter
>>>
>>>
>>> On Tuesday, June 9, 2015 at 3:19:01 PM UTC-7, William Le Ferrand wrote:
>>>>
>>>> Dear List,
>>>>
>>>> I've a little app that uses (the awesome) akka-streams over TCP.
>>>>
>>>> The client sends a request by doing
>>>>
>>>> def askServer() =
>>>> Source(query)
>>>> .via(Tcp().outgoingConnection(serverAddress))
>>>> .runForeach(processOneChunk)
>>>> .onComplete(completionHandler)
>>>>
>>>> and the server gets connection, process the query that way
>>>>
>>>> val handler = Sink.foreach[Tcp.IncomingConnection] { connection =>
>>>> val flow = someFlow
>>>> connection.handleWith(flow)
>>>> }
>>>> val connections = Tcp().bind(hostName, port)
>>>>
>>>> val binding = connections.to(handler).run()
>>>>
>>>>
>>>> It works 99% of the time, but under heavy load sometimes two calls to
>>>> askServer get pipelined into the same TcpIncomingConnection, meaning that a
>>>> given "someFlow" now has two inbound "queries".
>>>>
>>>> Is it expected? The second query fails every time because the tcp
>>>> connection is closed too early.
>>>>
>>>> Many thanks in advance for your help, and keep up the good work!!
>>>>
>>>> William
>>>>
>>>>
>>>> Mobile : (+1) (415) 683-1484
>>>> Web : http://williamleferrand.github.com/
>>>> <http://www.linkedin.com/in/williamleferrand>
>>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ:
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives:
>>> https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to [email protected].
>>> To post to this group, send email to [email protected].
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>>
>> --
>> William Le Ferrand
>>
>> Mobile : (+1) (415) 683-1484
>> Web : http://williamleferrand.github.com/
>> <http://www.linkedin.com/in/williamleferrand>
>>
>
>
>
> --
> William Le Ferrand
>
> Mobile : (+1) (415) 683-1484
> Web : http://williamleferrand.github.com/
> <http://www.linkedin.com/in/williamleferrand>
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
--
Patrik Nordwall
Typesafe <http://typesafe.com/> - Reactive apps on the JVM
Twitter: @patriknw
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.