Hey Zach, Can you try modifying this line in BrokerProxy:
https://github.com/apache/incubator-samza/blob/0.8.0/samza-kafka/src/main/s cala/org/apache/samza/system/kafka/BrokerProxy.scala#L283 To interrupt a lot in a tight loop and continue checking the thread status? Something like: while(thread.isAlive) { thread.interrupt } I want to see if we can catch the thread at a point when it's sleeping, not doing IO on the wire. If we can, the thread should shutdown cleanly, and we should see the "Got interrupt exception in broker proxy thread." exception. Cheers, Chris On 1/15/15 11:01 AM, "Chris Riccomini" <[email protected]> wrote: >Hey Zach, > >I'm beginning to wonder if this line is the problem: > > >https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consu >m >er/SimpleConsumer.scala#L75 > >If the sendRequest in SimpleConsumer catches a Throwable, it will catch >ClosedByInterruptException. This will cause the interrupt flag to get >reset, I think. If the consumer the retries the send, and it succeeds, >then I think it will continue on and ignore the interrupt. Going to poke a >Kafka dev to see if I am misunderstanding something. > >Cheers, >Chris > >On 1/15/15 10:52 AM, "Chris Riccomini" <[email protected]> wrote: > >>Hey Zach, >> >>Hmm. It sounds like something is catching the InterruptedException, and >>letting the interrupt flag get reset. If that happens, the >>ExponentialSleepStrategy [1] will continue running. >> >>I walked through the code, but couldn't find where it might be doing so, >>but it's hard to catch these without a test. >> >>> I have never seen that "Restarting consumer due to ..." warn line in >>>the >>>logs. >> >>If that's the case, then I don't think the BrokerProxy can be >>reconnecting >>since reconnect must be set to true, which would require seeing that log >>line [2]. >> >>Out of curiosity, what version of the JVM are you using? Trying to >>replicate this is proving tricky. >> >>I'm beginning to wonder if this has to do with InterruptedException vs. >>ClosedByInterruptException. In TestStatefulTask, I only see this the >>first >>log line: >> >> case e: InterruptedException => info("Got interrupt exception in broker >>proxy thread.") >> case e: ClosedByInterruptException => info("Got closed by interrupt >>exception in broker proxy thread.") >> >>This is probably because the test connects to a local Kafka broker, so >>nio >>socket operations are extremely fast, and the test almost always gets >>interrupted on a Thread.sleep, not a blocking socket operation. I'm >>wondering if a ClosedByInterruptException exception would cause the >>hanging that you're seeing. >> >> >>Out of curiosity, is your container on the same machine as your broker, >>or >>a different machine? >> >>Cheers, >>Chris >> >>[1] >>https://github.com/apache/incubator-samza/blob/0.8.0/samza-core/src/main/ >>s >>c >>ala/org/apache/samza/util/ExponentialSleepStrategy.scala#L80 >>[2] >>https://github.com/apache/incubator-samza/blob/0.8.0/samza-kafka/src/main >>/ >>s >>cala/org/apache/samza/system/kafka/BrokerProxy.scala#L133 >> >>On 1/15/15 9:38 AM, "Zach Cox" <[email protected]> wrote: >> >>>Hi Chris, >>> >>> >>>> Looking at your logs (from previous email, granted, different >>>>execution), >>>> I see that we get as far as SimpleConsumer.disconnect(): >>>> >>>> [DEBUG] [2015-01-14 17:19:12,596] [SAMZA-BROKER-PROXY-BrokerProxy >>>>...] >>>> o.a.s.s.k.DefaultFetchSimpleConsumer: Disconnecting from >>>>localhost:9192 >>>> >>>> This is invoked by BrokerProxy.scala:135. The only way this should get >>>> invoked is if an exception is caught. Do you see this log line >>>>anywhere? >>>> >>>> warn("Restarting consumer due to %s. Turn on debugging to get a full >>>> stack trace." format exception) >>>> >>> >>>DefaultFetchConsumer.close() calls SimpleConsumer.close(), which calls >>>disconnect(), which also logs that "Disconnecting from localhost:9192" >>>line: >>> >>>https://github.com/apache/incubator-samza/blob/master/samza-kafka/src/ma >>>i >>>n >>>/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala#L4 >>>4 >>> >>>https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/con >>>s >>>u >>>mer/SimpleConsumer.scala#L50 >>> >>>I have never seen that "Restarting consumer due to ..." warn line in the >>>logs. >>> >>>Also note that I'm using Samza 0.8.0, not master. >>> >>> >>>> I ran our TestStatefulTask test a few times, and verified that the >>>> consumer CAN be shutdown: >>>> >>>> [ThreadJob] INFO org.apache.samza.container.SamzaContainer - >>>>Shutting >>>> down consumer multiplexer. >>>> [ThreadJob] INFO org.apache.samza.system.kafka.BrokerProxy - >>>>Shutting >>>> down BrokerProxy for localhost:59075 >>>> 2015-01-15 08:59:26 DefaultFetchSimpleConsumer [WARN] Reconnect due >>>>to >>>> socket error: null >>>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at ...] INFO >>>> org.apache.samza.system.kafka.BrokerProxy - Got closed by interrupt >>>> exception in broker proxy thread. >>>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at ...] INFO >>>> org.apache.samza.system.kafka.BrokerProxy - Shutting down due to >>>>interrupt. >>>> // consumer shutdown is complete here.. The container has moved on >>>>to >>>> shutting down the producer >>>> [ThreadJob] INFO org.apache.samza.container.SamzaContainer - >>>>Shutting >>>> down producer multiplexer. >>>> >>> >>>Yesterday afternoon I was showing this to a colleague, and we did >>>observe >>>a >>>proper clean shutdown once. Then the next 2 times I showed him we saw >>>the >>>BrokerProxy blocking issue. So I also observe proper shutdown sometimes >>>too, just seems a lot more often that BrokerProxy blocks and it doesn't >>>shut down. >>> >>>Thanks, >>>Zach >>> >>> >>> >>> >>>> >>>> >>>> Cheers, >>>> Chris >>>> >>>> On 1/14/15 7:24 PM, "Zach Cox" <[email protected]> wrote: >>>> >>>> >Hi Chris, >>>> > >>>> >I did a thread dump during the shutdown process, when I could tell it >>>> >wasn't shutting down properly [1]. You can see this thread dump in >>>>the >>>> >context of the other logging at shutdown. >>>> > >>>> >The main thread is indeed blocked on the Thread.join call in >>>> >BrokerProxy.stop. The BrokerProxy thread looks like it's still >>>>consuming >>>> >from Kafka? >>>> > >>>> >The thread dump was a bit tricky; this Samza container is running in >>>>a >>>> >Docker container on boot2docker, which is Tiny Core Linux, i.e. no >>>>jstack. >>>> >I never new this, but `sudo kill -3 [pid]` tells the jvm to do a >>>>thread >>>> >dump to stdout [2] :) >>>> > >>>> >Thanks, >>>> >Zach >>>> > >>>> >[1] >>>> > >>>> >>>>https://gist.githubusercontent.com/zcox/bb47a61d4ae1acd54056/raw/83224d >>>>2 >>>>0 >>>>a >>>> >61de20bb47bcf916d92930a71cd97ad/gistfile1.txt >>>> > >>>> >[2] >>>> > >>>> >>>>http://serverfault.com/questions/574745/jstack-alternative-for-linux-re >>>>d >>>>h >>>>a >>>> >t-6 >>>> > >>>> > >>>> >On Wed, Jan 14, 2015 at 7:30 PM, Chris Riccomini < >>>> >[email protected]> wrote: >>>> > >>>> >> Hey Zach, >>>> >> >>>> >> It sounds likely that the BrokerProxy thread is blocking or >>>>improperly >>>> >> catching an interrupt exception. >>>> >> >>>> >> Can you take a thread dump? My guess is that you'll see that the >>>>main >>>> >> thread is blocked on BrokerProxy.stop: >>>> >> >>>> >> thread.interrupt >>>> >> thread.join >>>> >> >>>> >> >>>> >> It'll likely be blocked on thread.join. If that's the case, I'd >>>>like >>>>to >>>> >> see what the BrokerProxy thread is doing. This line indicates that >>>>the >>>> >> thread seems to be shutting down: >>>> >> >>>> >> [DEBUG] [2015-01-14 17:19:12,596] [SAMZA-BROKER-PROXY-BrokerProxy >>>>thread >>>> >> pointed at localhost:9192 for client >>>> >> samza_consumer-twitter_message_separation-1-1421255914862-1] >>>> >> o.a.s.s.k.DefaultFetchSimpleConsumer: Disconnecting from >>>>localhost:9192 >>>> >> >>>> >> But if thread.join is blocking, then the shutdown never seems to >>>> >>complete. >>>> >> It'd be good to see where these threads are at. >>>> >> >>>> >> >>>> >> Cheers, >>>> >> Chris >>>> >> >>>> >> On 1/14/15 10:49 AM, "Zach Cox" <[email protected]> wrote: >>>> >> >>>> >> >Hi - related to the discussion yesterday about graceful shutdown >>>>[1], >>>> >> >today >>>> >> >I can't seem to get the SamzaContainer to actually shut down. >>>> >>Yesterday I >>>> >> >was seeing nice, fast, complete shutdown logs [2]. However, today >>>>the >>>> >>last >>>> >> >log line I see is related to shutting down BrokerProxy, then >>>>nothing >>>> >>else >>>> >> >happens until the container is finally just SIGKILLed by Docker >>>>[3]. >>>> >> > >>>> >> >The consistency with which clean, fast shutdowns were happening >>>> >>yesterday >>>> >> >and now they are never happening today leads me to believe I've >>>>changed >>>> >> >something, but I really can't find anything that would lead to >>>>this. >>>> >>It's >>>> >> >almost like there's some deadlock in BrokerProxy, but I would >>>>think >>>> >>that >>>> >> >would show up randomly - I've tried this about 100 times today, >>>>and >>>> >>every >>>> >> >time the logs are as shown in [3]. Today I also see 2 exceptions >>>>in >>>>the >>>> >> >Kafka broker logs [4] but I don't know if those were occurring >>>> >>yesterday >>>> >> >when Samza was cleanly shutting down, or not. >>>> >> > >>>> >> >One thing that doesn't seem to be happening today is this log line >>>>from >>>> >> >BrokerProxy: "Got interrupt exception in broker proxy thread." >>>> >> > >>>> >> >Has anyone seen this, or have any advice on what to try next? >>>> >> > >>>> >> >Thanks, >>>> >> >Zach >>>> >> > >>>> >> > >>>> >> >[1] >>>> >> >>>> >>> >>>> >>>>http://www.mail-archive.com/[email protected]/msg02246.htm >>>>l >>>> >> > >>>> >> >[2] >>>> >> > >>>> >> >>>> >> >>>> >>>>https://gist.githubusercontent.com/zcox/6ec8910bd3f18e36c1a2/raw/666eae >>>>2 >>>>4 >>>> >>5 >>>> >> >11490bf51a66e56fd90c794ea6b6282/stdout >>>> >> > >>>> >> >[3] >>>> >> > >>>> >> >>>> >> >>>> >>>>https://gist.githubusercontent.com/zcox/673a2ba607c566de7650/raw/d29ea8 >>>>b >>>>9 >>>> >>9 >>>> >> >868da9cc4dc1db198315ee1d03bc694/gistfile1.txt >>>> >> > >>>> >> >[4] >>>> >> > >>>> >> >>>> >> >>>> >>>>https://gist.githubusercontent.com/zcox/f08de55c5d5fe2d70cde/raw/c4be45 >>>>3 >>>>b >>>> >>5 >>>> >> >f6a57cfb5acb08508667f4b8ce8c2bd/gistfile1.txt >>>> >> >>>> >> >>>> >>>> >> >
