Hi Reza, At the moment of discussion there was no eagerComplete option. Thanks for mentioning it, i've just got rid of completeSink and everything is working perfectly!
On Friday, 18 November 2016 10:33:19 UTC+3, Reza Samee wrote: > > Hi guys; I'm sorry for replying to this old thread but I'm curious about > this. > I was the same problem (CLOSE_WAIT connections); I searched and found this > (old) thread; but the solution is weird, and not possible for my case, then > I searched and found another/better (in my opinion) solution: I just set > the merge stage to complete if one of the sources completes (don't wait for > all of them): 'builder add Merge[Something](2, true)' > It worked for me. Now I have a question: Why didn't you use of that? Was > this option available in the version that you were using? or it had another > reason? > > On Sunday, June 7, 2015 at 11:18:57 PM UTC+4:30, Andrey Kuznetsov wrote: >> >> On Sunday, June 7, 2015 at 6:38:51 PM UTC+3, drewhk wrote: >>> >>> >>> >>> On Sun, Jun 7, 2015 at 5:32 PM, Andrey Kuznetsov <[email protected]> >>> wrote: >>> >>>> auth and session makes context.stop(self) inside Cancel handler of >>>> their receive's. >>>> >>> >>> umm, but they will not get a Cancel from merge if merge itself is not >>> cancelled, which it will not be in this case. So mtproto gets the >>> completion from the TCP flow, feeds it into merge, but merge will continue >>> happily. >>> >> >> umm, I see, now I understand >> >> I rewrote the flow, and now I observing only TIME_WAIT and no CLOSE_WAIT >> sockets which is pretty ok. The fixed flow: >> >> val completeSink = Sink.onComplete { >> case x ⇒ >> authManager ! PoisonPill >> sessionClient ! PoisonPill >> system.log.debug("Completing {}", x) >> } >> >> Flow() { implicit builder ⇒ >> import FlowGraph.Implicits._ >> >> val bcast = builder.add(Broadcast[ByteString](2)) >> val merge = builder.add(Merge[MTProto](3)) >> >> val mtproto = builder.add(mtprotoFlow) >> val auth = builder.add(authSource) >> val session = builder.add(sessionClientSource) >> val mapResp = builder.add(mapRespFlow) >> val complete = builder.add(completeSink) >> >> // format: OFF >> >> bcast ~> complete >> bcast ~> mtproto ~> merge >> auth ~> merge >> session ~> merge ~> mapResp >> >> // format: ON >> >> (bcast.in, mapResp.outlet) >> } >> >> Enter code here... >> >> Thank you, both @Akka team and @drehk! >> >> >>> >>> >>>> And they see it if client closes connection (at least if it is being >>>> closed by Connection reset by peer). >>>> >>> >>> Yes, because then the TCP flow *cancels* not half-closes, i.e. you will >>> get a cancel event coming from the downstream to the upstreams, which in >>> turn causes merge itself and all upstreams to cancel (there is also an >>> onError coming from the upstream direction at the same time). A normal >>> remote client on the other hand comes as a completion event form the >>> >>> Anyway, it is easy to check this. Put a printline in somewhere the auth >>> or session actor where it gets a Cancel, then try a half-closed client >>> connection and see what gets printed. >>> >>> -Endre >>> >>> >>>> I tried to send PoisonPill to auth and session actors from >>>> Sink.onComplete and still observed CLOSE_WAIT connections. >>>> -- >>>> >>>> Andrey Kuznetsov >>>> >>>> >>>> On Sun, Jun 7, 2015, at 08:07 AM, Endre Varga wrote: >>>> >>>> Hi Andrey, >>>> >>>> When do the auth and session sources finish? Do you see the onComplete >>>> log message from broadcast? Since the connections you see are in >>>> CLOSE_WAIT, that means that your server already received a close from the >>>> client, but it has not yet closed its own side (half-closed). I suspect >>>> that since you have more than one input wired into your merge node, if the >>>> inbound mtproto flow closes, the merge itself continues if the other two >>>> inputs of it are not completing and therefore keeping the connection open. >>>> >>>> -Endre >>>> _ >>>> >>>> On Sun, Jun 7, 2015 at 4:44 PM, Andrey Kuznetsov <[email protected]> >>>> wrote: >>>> >>>> and mapResponse is a PushStage: >>>> >>>> >>>> *def*mapResponse(system:ActorSystem)=*new*PushStage[MTProto,ByteString]{ >>>> *private*[*this*]*var*packageIndex:Int=-1 >>>> >>>> *overridedef*onPush(elem:MTProto,ctx:Context[ByteString])={ >>>> packageIndex +=1 >>>> *val *pkg =TransportPackage(packageIndex,elem) >>>> >>>> *val *resBits =TransportPackageCodec.encode(pkg).require >>>> *val *res =ByteString(resBits.toByteBuffer) >>>> >>>> elem *match *{ >>>> *case*_:Drop⇒ >>>> ctx.pushAndFinish(res) >>>> *case*_ ⇒ >>>> ctx.push(res) >>>> } >>>> } >>>> >>>> >>>> >>>> >>>> >>>> On Sunday, June 7, 2015 at 7:03:01 AM UTC+3, Andrey Kuznetsov wrote: >>>> >>>> I am using akka-streams for handling incoming TCP connections. After >>>> putting application behind AWS Elastic Load Balancer, it started to fall >>>> into "too many open files" problem because of lots of connections in >>>> CLOSE_WAIT state. Is there an ability to close such connection on a server >>>> side? >>>> >>>> >>>> -- >>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>> >>>>>>>>>> Check the FAQ: >>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>> >>>>>>>>>> Search the archives: >>>> https://groups.google.com/group/akka-user >>>> --- >>>> You received this message because you are subscribed to the Google >>>> Groups "Akka User List" group. >>>> To unsubscribe from this group and stop receiving emails from it, send >>>> an email to [email protected]. >>>> To post to this group, send email to [email protected]. >>>> Visit this group at http://groups.google.com/group/akka-user. >>>> For more options, visit https://groups.google.com/d/optout. >>>> >>>> >>>> >>>> >>>> -- >>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>> >>>>>>>>>> Check the FAQ: >>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>> >>>>>>>>>> Search the archives: >>>> https://groups.google.com/group/akka-user >>>> --- >>>> You received this message because you are subscribed to a topic in the >>>> Google Groups "Akka User List" group. >>>> To unsubscribe from this topic, visit >>>> https://groups.google.com/d/topic/akka-user/MC1J4q9lZYw/unsubscribe. >>>> To unsubscribe from this group and all its topics, send an email to >>>> [email protected]. >>>> To post to this group, send email to [email protected]. >>>> Visit this group at http://groups.google.com/group/akka-user. >>>> For more options, visit https://groups.google.com/d/optout. >>>> >>>> >>>> >>>> -- >>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>> >>>>>>>>>> Check the FAQ: >>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>> >>>>>>>>>> Search the archives: >>>> https://groups.google.com/group/akka-user >>>> --- >>>> You received this message because you are subscribed to the Google >>>> Groups "Akka User List" group. >>>> To unsubscribe from this group and stop receiving emails from it, send >>>> an email to [email protected]. >>>> To post to this group, send email to [email protected]. >>>> Visit this group at http://groups.google.com/group/akka-user. >>>> For more options, visit https://groups.google.com/d/optout. >>>> >>> >>> -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
