On Sunday, June 7, 2015 at 6:38:51 PM UTC+3, drewhk wrote:
>
>
>
> On Sun, Jun 7, 2015 at 5:32 PM, Andrey Kuznetsov <[email protected] 
> <javascript:>> wrote:
>
>>  auth and session makes context.stop(self) inside Cancel handler of 
>> their receive's. 
>>
>
> umm, but they will not get a Cancel from merge if merge itself is not 
> cancelled, which it will not be in this case. So mtproto gets the 
> completion from the TCP flow, feeds it into merge, but merge will continue 
> happily.
>

umm, I see, now I understand

I rewrote the flow, and now I observing only TIME_WAIT and no CLOSE_WAIT 
sockets which is pretty ok. The fixed flow:

val completeSink = Sink.onComplete {
  case x ⇒
    authManager ! PoisonPill
    sessionClient ! PoisonPill
    system.log.debug("Completing {}", x)
}

Flow() { implicit builder ⇒
  import FlowGraph.Implicits._

  val bcast = builder.add(Broadcast[ByteString](2))
  val merge = builder.add(Merge[MTProto](3))

  val mtproto = builder.add(mtprotoFlow)
  val auth = builder.add(authSource)
  val session = builder.add(sessionClientSource)
  val mapResp = builder.add(mapRespFlow)
  val complete = builder.add(completeSink)

  // format: OFF

  bcast ~> complete
  bcast ~> mtproto ~> merge
           auth    ~> merge
           session ~> merge ~> mapResp

  // format: ON

  (bcast.in, mapResp.outlet)
}

Enter code here...

Thank you, both @Akka team and @drehk!
 

>  
>
>> And they see it if client closes connection (at least if it is being 
>> closed by Connection reset by peer).
>>
>
> Yes, because then the TCP flow *cancels* not half-closes, i.e. you will 
> get a cancel event coming from the downstream to the upstreams, which in 
> turn causes merge itself and all upstreams to cancel (there is also an 
> onError coming from the upstream direction at the same time). A normal 
> remote client on the other hand comes as a completion event form the 
>
> Anyway, it is easy to check this. Put a printline in somewhere the auth or 
> session actor where it gets a Cancel, then try a half-closed client 
> connection and see what gets printed.
>
> -Endre
>  
>
>> I tried to send PoisonPill to auth and session actors from 
>> Sink.onComplete and still observed CLOSE_WAIT connections.
>> --
>>  
>> Andrey Kuznetsov
>>   
>>  
>> On Sun, Jun 7, 2015, at 08:07 AM, Endre Varga wrote:
>>
>> Hi Andrey,
>>  
>> When do the auth and session sources finish? Do you see the onComplete 
>> log message from broadcast? Since the connections you see are in 
>> CLOSE_WAIT, that means that your server already received a close from the 
>> client, but it has not yet closed its own side (half-closed). I suspect 
>> that since you have more than one input wired into your merge node, if the 
>> inbound mtproto flow closes, the merge itself continues if the other two 
>> inputs of it are not completing and therefore keeping the connection open.
>>  
>> -Endre
>> _
>>   
>> On Sun, Jun 7, 2015 at 4:44 PM, Andrey Kuznetsov <[email protected] 
>> <javascript:>> wrote:
>>
>> and mapResponse is a PushStage:
>>  
>>
>> *def*mapResponse(system:ActorSystem)=*new*PushStage[MTProto,ByteString]{
>> *private*[*this*]*var*packageIndex:Int=-1
>>
>> *overridedef*onPush(elem:MTProto,ctx:Context[ByteString])={
>>     packageIndex +=1
>> *val *pkg =TransportPackage(packageIndex,elem)
>>
>> *val *resBits =TransportPackageCodec.encode(pkg).require
>> *val *res =ByteString(resBits.toByteBuffer)
>>
>>     elem *match *{
>> *case*_:Drop⇒
>>         ctx.pushAndFinish(res)
>> *case*_ ⇒
>>         ctx.push(res)
>> }
>> }
>>
>>
>>   
>>  
>>  
>> On Sunday, June 7, 2015 at 7:03:01 AM UTC+3, Andrey Kuznetsov wrote:
>>
>> I am using akka-streams for handling incoming TCP connections. After 
>> putting application behind AWS Elastic Load Balancer, it started to fall 
>> into "too many open files" problem because of lots of connections in 
>> CLOSE_WAIT state. Is there an ability to close such connection on a server 
>> side?
>>
>>  
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected] <javascript:>.
>> To post to this group, send email to [email protected] 
>> <javascript:>.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>  
>>  
>>  
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to a topic in the 
>> Google Groups "Akka User List" group.
>> To unsubscribe from this topic, visit 
>> https://groups.google.com/d/topic/akka-user/MC1J4q9lZYw/unsubscribe.
>> To unsubscribe from this group and all its topics, send an email to 
>> [email protected] <javascript:>.
>> To post to this group, send email to [email protected] 
>> <javascript:>.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>>  
>>  
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected] <javascript:>.
>> To post to this group, send email to [email protected] 
>> <javascript:>.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to