Hey Zach, Thanks for filing the JIRA! It looks like we've managed to reproduce the issue on our end. When we open up a JIRA, I'll link it to this ticket, and we can go from there.
It looks like the blockingChannel.receive call is actually swallowing the interrupt entirely. The SimpleConsumer doesn't even seem to catch the throwabe--it seems to disappear somewhere in Java's NIO code. Both you and we were using Java 1.6 when we reproduced this. We're going to see if Java 1.8 fixes it. Cheers, Chris On 1/16/15 10:27 AM, "Zach Cox" <[email protected]> wrote: >Hi Chris, > >Here is the jira: https://issues.apache.org/jira/browse/SAMZA-513 > >Also here is the final working shutdown hook code and example logs based >on >your workaround: https://gist.github.com/zcox/f80fa8957c4964e8ebc0 > >I am totally OK with somewhat horrific hacks in this shutdown code. :) > >Thanks again for all of your help! >-Zach > > >On Thu, Jan 15, 2015 at 5:45 PM, Chris Riccomini < >[email protected]> wrote: > >> Hey Zach, >> >> Awesome! Could you open a Samza JIRA with a brief description, and point >> to this mailing list conversation? >> >> > I'm assuming this will be fixed in 0.9.0 - any targets yet for when >>that >> >release might be out? Just trying to plan the best way to get this fix. >> >> A Kafka dev is looking at the problem right now, and agrees with me that >> it looks like SimpleConsumer can swallow these exceptions. He's going to >> 1) try to reproduce, and 2) open a Kafka JIRA if he can. The problem is, >> since the fix will have to be in Kafka, not Samza, it could take some >> time. Kafka is going through an 0.8.2 release vote right now, which >>means >> a fix wouldn't be released until 0.8.3, which could be a few months out. >> >> I'd plan for probably a quarter, at least, before a fix is ready. :( >> Sorry! Having to run with a forked Samza codebase isn't really >>desirable. >> An alternative, somewhat horrific hack, would be to modify your shutdown >> hook code to: >> >> 1. Get a list of all threads [1]. >> 2. Iterate over threads looking for Thread.getName() with the >> BROKER_PROXY_THREAD_NAME_PREFIX ("BROKER-PROXY-") >> 3. Interrupt these threads in a tight loop. >> >> Cheers, >> Chris >> >> [1] >> >>http://stackoverflow.com/questions/1323408/get-a-list-of-all-threads-curr >>en >> tly-running-in-java >> >> On 1/15/15 2:19 PM, "Zach Cox" <[email protected]> wrote: >> >> >Hi Chris - I managed to get Samza 0.8.0 building locally with this >>change >> >in BrokerProxy: >> > >> > def stop { >> > info("Shutting down " + toString) >> > >> > var count = 0 >> > while(thread.isAlive) { >> > thread.interrupt >> > count += 1 >> > } >> > info(s"Took $count interrupts") >> > thread.join >> > } >> > >> >So far I've tried 5 times, and shutdowns worked every time! Here is an >> >example of logs I see now: >> > >> > >> >>https://gist.githubusercontent.com/zcox/2c638b2d204b61dcce35/raw/9380499a >>1 >> >92db7ba0d0bdb0c2330e6354730f8c8/gistfile1.txt >> > >> >The number of interrupts in each of the 5 runs were: 345787, 2275, >>1699, >> >47574 >> >and 4996. >> > >> >Seems like that was the problem, since it takes so many interrupts >>before >> >isAlive returns false. >> > >> >I'm assuming this will be fixed in 0.9.0 - any targets yet for when >>that >> >release might be out? Just trying to plan the best way to get this fix. >> > >> >Thanks, >> >Zach >> > >> > >> >On Thu, Jan 15, 2015 at 2:31 PM, Chris Riccomini < >> >[email protected]> wrote: >> > >> >> Hey Zach, >> >> >> >> Cool. I've engaged with some devs to understand if SimpleConsumer >>might >> >>be >> >> swallowing the interrupt. >> >> >> >> Cheers, >> >> Chris >> >> >> >> On 1/15/15 12:29 PM, "Zach Cox" <[email protected]> wrote: >> >> >> >> >Hi Chris, >> >> > >> >> >Here is an example of logs when I see proper shutdown: >> >> > >> >> > >> >> >> >> >> >>https://gist.githubusercontent.com/zcox/6ec8910bd3f18e36c1a2/raw/666eae24 >> >>5 >> >> >11490bf51a66e56fd90c794ea6b6282/stdout >> >> > >> >> >That does show the InterruptedException "Got interrupt exception in >> >>broker >> >> >proxy thread." line. >> >> > >> >> >Here's java -version output on the Samza container: >> >> > >> >> >java version "1.6.0_33" OpenJDK Runtime Environment (IcedTea6 >>1.13.5) >> >> >(6b33-1.13.5-1ubuntu0.14.04) OpenJDK 64-Bit Server VM (build >>23.25-b01, >> >> >mixed mode) >> >> > >> >> >Currently all of this stuff (Mesos, Kafka, Samza, etc) is running >>on my >> >> >laptop. >> >> > >> >> >I've just been using the released Samza 0.8.0 jars from maven >>central - >> >> >will try to get Samza building locally and add that tight interrupt >> >>loop, >> >> >and see what happens. >> >> > >> >> >Thanks, >> >> >Zach >> >> > >> >> > >> >> >On Thu, Jan 15, 2015 at 1:11 PM, Chris Riccomini < >> >> >[email protected]> wrote: >> >> > >> >> >> Hey Zach, >> >> >> >> >> >> Can you try modifying this line in BrokerProxy: >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>https://github.com/apache/incubator-samza/blob/0.8.0/samza-kafka/src/main >> >> >>/s >> >> >> cala/org/apache/samza/system/kafka/BrokerProxy.scala#L283 >> >> >> >> >> >> To interrupt a lot in a tight loop and continue checking the >>thread >> >> >> status? Something like: >> >> >> >> >> >> while(thread.isAlive) { >> >> >> thread.interrupt >> >> >> } >> >> >> >> >> >> I want to see if we can catch the thread at a point when it's >> >>sleeping, >> >> >> not doing IO on the wire. If we can, the thread should shutdown >> >>cleanly, >> >> >> and we should see the "Got interrupt exception in broker proxy >> >>thread." >> >> >> exception. >> >> >> >> >> >> Cheers, >> >> >> Chris >> >> >> >> >> >> On 1/15/15 11:01 AM, "Chris Riccomini" <[email protected]> >> >>wrote: >> >> >> >> >> >> >Hey Zach, >> >> >> > >> >> >> >I'm beginning to wonder if this line is the problem: >> >> >> > >> >> >> > >> >> >> > >> >> >> >> >> >> >> >> >> >> >> >>https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/cons >> >> >>u >> >> >> >m >> >> >> >er/SimpleConsumer.scala#L75 >> >> >> > >> >> >> >If the sendRequest in SimpleConsumer catches a Throwable, it will >> >>catch >> >> >> >ClosedByInterruptException. This will cause the interrupt flag to >> >>get >> >> >> >reset, I think. If the consumer the retries the send, and it >> >>succeeds, >> >> >> >then I think it will continue on and ignore the interrupt. Going >>to >> >> >>poke a >> >> >> >Kafka dev to see if I am misunderstanding something. >> >> >> > >> >> >> >Cheers, >> >> >> >Chris >> >> >> > >> >> >> >On 1/15/15 10:52 AM, "Chris Riccomini" <[email protected]> >> >> wrote: >> >> >> > >> >> >> >>Hey Zach, >> >> >> >> >> >> >> >>Hmm. It sounds like something is catching the >>InterruptedException, >> >> >>and >> >> >> >>letting the interrupt flag get reset. If that happens, the >> >> >> >>ExponentialSleepStrategy [1] will continue running. >> >> >> >> >> >> >> >>I walked through the code, but couldn't find where it might be >> >>doing >> >> >>so, >> >> >> >>but it's hard to catch these without a test. >> >> >> >> >> >> >> >>> I have never seen that "Restarting consumer due to ..." warn >> >>line in >> >> >> >>>the >> >> >> >>>logs. >> >> >> >> >> >> >> >>If that's the case, then I don't think the BrokerProxy can be >> >> >> >>reconnecting >> >> >> >>since reconnect must be set to true, which would require seeing >> >>that >> >> >>log >> >> >> >>line [2]. >> >> >> >> >> >> >> >>Out of curiosity, what version of the JVM are you using? Trying >>to >> >> >> >>replicate this is proving tricky. >> >> >> >> >> >> >> >>I'm beginning to wonder if this has to do with >>InterruptedException >> >> >>vs. >> >> >> >>ClosedByInterruptException. In TestStatefulTask, I only see this >> >>the >> >> >> >>first >> >> >> >>log line: >> >> >> >> >> >> >> >> case e: InterruptedException => info("Got interrupt exception >>in >> >> >>broker >> >> >> >>proxy thread.") >> >> >> >> case e: ClosedByInterruptException => info("Got closed by >> >>interrupt >> >> >> >>exception in broker proxy thread.") >> >> >> >> >> >> >> >>This is probably because the test connects to a local Kafka >> >>broker, so >> >> >> >>nio >> >> >> >>socket operations are extremely fast, and the test almost always >> >>gets >> >> >> >>interrupted on a Thread.sleep, not a blocking socket operation. >>I'm >> >> >> >>wondering if a ClosedByInterruptException exception would cause >>the >> >> >> >>hanging that you're seeing. >> >> >> >> >> >> >> >> >> >> >> >>Out of curiosity, is your container on the same machine as your >> >> >>broker, >> >> >> >>or >> >> >> >>a different machine? >> >> >> >> >> >> >> >>Cheers, >> >> >> >>Chris >> >> >> >> >> >> >> >>[1] >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>https://github.com/apache/incubator-samza/blob/0.8.0/samza-core/src/main/ >> >> >> >>s >> >> >> >>c >> >> >> >>ala/org/apache/samza/util/ExponentialSleepStrategy.scala#L80 >> >> >> >>[2] >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>https://github.com/apache/incubator-samza/blob/0.8.0/samza-kafka/src/main >> >> >> >>/ >> >> >> >>s >> >> >> >>cala/org/apache/samza/system/kafka/BrokerProxy.scala#L133 >> >> >> >> >> >> >> >>On 1/15/15 9:38 AM, "Zach Cox" <[email protected]> wrote: >> >> >> >> >> >> >> >>>Hi Chris, >> >> >> >>> >> >> >> >>> >> >> >> >>>> Looking at your logs (from previous email, granted, different >> >> >> >>>>execution), >> >> >> >>>> I see that we get as far as SimpleConsumer.disconnect(): >> >> >> >>>> >> >> >> >>>> [DEBUG] [2015-01-14 17:19:12,596] >> >>[SAMZA-BROKER-PROXY-BrokerProxy >> >> >> >>>>...] >> >> >> >>>> o.a.s.s.k.DefaultFetchSimpleConsumer: Disconnecting from >> >> >> >>>>localhost:9192 >> >> >> >>>> >> >> >> >>>> This is invoked by BrokerProxy.scala:135. The only way this >> >>should >> >> >>get >> >> >> >>>> invoked is if an exception is caught. Do you see this log >>line >> >> >> >>>>anywhere? >> >> >> >>>> >> >> >> >>>> warn("Restarting consumer due to %s. Turn on debugging to >>get >> >>a >> >> >>full >> >> >> >>>> stack trace." format exception) >> >> >> >>>> >> >> >> >>> >> >> >> >>>DefaultFetchConsumer.close() calls SimpleConsumer.close(), >>which >> >> >>calls >> >> >> >>>disconnect(), which also logs that "Disconnecting from >> >> >>localhost:9192" >> >> >> >>>line: >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> https://github.com/apache/incubator-samza/blob/master/samza-kafka/src/ma >> >> >> >>>i >> >> >> >>>n >> >> >> >> >> >> >>>>>>>>>/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.sc >>>>>>>>>al >> >>>>>>>a# >> >> >>>>>L4 >> >> >> >>>4 >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/con >> >> >> >>>s >> >> >> >>>u >> >> >> >>>mer/SimpleConsumer.scala#L50 >> >> >> >>> >> >> >> >>>I have never seen that "Restarting consumer due to ..." warn >>line >> >>in >> >> >>the >> >> >> >>>logs. >> >> >> >>> >> >> >> >>>Also note that I'm using Samza 0.8.0, not master. >> >> >> >>> >> >> >> >>> >> >> >> >>>> I ran our TestStatefulTask test a few times, and verified >>that >> >>the >> >> >> >>>> consumer CAN be shutdown: >> >> >> >>>> >> >> >> >>>> [ThreadJob] INFO org.apache.samza.container.SamzaContainer >>- >> >> >> >>>>Shutting >> >> >> >>>> down consumer multiplexer. >> >> >> >>>> [ThreadJob] INFO org.apache.samza.system.kafka.BrokerProxy >>- >> >> >> >>>>Shutting >> >> >> >>>> down BrokerProxy for localhost:59075 >> >> >> >>>> 2015-01-15 08:59:26 DefaultFetchSimpleConsumer [WARN] >> >>Reconnect >> >> >>due >> >> >> >>>>to >> >> >> >>>> socket error: null >> >> >> >>>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at ...] INFO >> >> >> >>>> org.apache.samza.system.kafka.BrokerProxy - Got closed by >> >>interrupt >> >> >> >>>> exception in broker proxy thread. >> >> >> >>>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at ...] INFO >> >> >> >>>> org.apache.samza.system.kafka.BrokerProxy - Shutting down >>due to >> >> >> >>>>interrupt. >> >> >> >>>> // consumer shutdown is complete here.. The container has >> >>moved >> >> >>on >> >> >> >>>>to >> >> >> >>>> shutting down the producer >> >> >> >>>> [ThreadJob] INFO org.apache.samza.container.SamzaContainer >>- >> >> >> >>>>Shutting >> >> >> >>>> down producer multiplexer. >> >> >> >>>> >> >> >> >>> >> >> >> >>>Yesterday afternoon I was showing this to a colleague, and we >>did >> >> >> >>>observe >> >> >> >>>a >> >> >> >>>proper clean shutdown once. Then the next 2 times I showed him >>we >> >>saw >> >> >> >>>the >> >> >> >>>BrokerProxy blocking issue. So I also observe proper shutdown >> >> >>sometimes >> >> >> >>>too, just seems a lot more often that BrokerProxy blocks and it >> >> >>doesn't >> >> >> >>>shut down. >> >> >> >>> >> >> >> >>>Thanks, >> >> >> >>>Zach >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>>> >> >> >> >>>> >> >> >> >>>> Cheers, >> >> >> >>>> Chris >> >> >> >>>> >> >> >> >>>> On 1/14/15 7:24 PM, "Zach Cox" <[email protected]> wrote: >> >> >> >>>> >> >> >> >>>> >Hi Chris, >> >> >> >>>> > >> >> >> >>>> >I did a thread dump during the shutdown process, when I >>could >> >> >>tell it >> >> >> >>>> >wasn't shutting down properly [1]. You can see this thread >> >>dump in >> >> >> >>>>the >> >> >> >>>> >context of the other logging at shutdown. >> >> >> >>>> > >> >> >> >>>> >The main thread is indeed blocked on the Thread.join call in >> >> >> >>>> >BrokerProxy.stop. The BrokerProxy thread looks like it's >>still >> >> >> >>>>consuming >> >> >> >>>> >from Kafka? >> >> >> >>>> > >> >> >> >>>> >The thread dump was a bit tricky; this Samza container is >> >>running >> >> >>in >> >> >> >>>>a >> >> >> >>>> >Docker container on boot2docker, which is Tiny Core Linux, >> >>i.e. no >> >> >> >>>>jstack. >> >> >> >>>> >I never new this, but `sudo kill -3 [pid]` tells the jvm to >>do >> >>a >> >> >> >>>>thread >> >> >> >>>> >dump to stdout [2] :) >> >> >> >>>> > >> >> >> >>>> >Thanks, >> >> >> >>>> >Zach >> >> >> >>>> > >> >> >> >>>> >[1] >> >> >> >>>> > >> >> >> >>>> >> >> >> >>>> >> >> >> >> >>>>https://gist.githubusercontent.com/zcox/bb47a61d4ae1acd54056/raw/83224d >> >> >> >>>>2 >> >> >> >>>>0 >> >> >> >>>>a >> >> >> >>>> >61de20bb47bcf916d92930a71cd97ad/gistfile1.txt >> >> >> >>>> > >> >> >> >>>> >[2] >> >> >> >>>> > >> >> >> >>>> >> >> >> >>>> >> >> >> >> >>>>http://serverfault.com/questions/574745/jstack-alternative-for-linux-re >> >> >> >>>>d >> >> >> >>>>h >> >> >> >>>>a >> >> >> >>>> >t-6 >> >> >> >>>> > >> >> >> >>>> > >> >> >> >>>> >On Wed, Jan 14, 2015 at 7:30 PM, Chris Riccomini < >> >> >> >>>> >[email protected]> wrote: >> >> >> >>>> > >> >> >> >>>> >> Hey Zach, >> >> >> >>>> >> >> >> >> >>>> >> It sounds likely that the BrokerProxy thread is blocking >>or >> >> >> >>>>improperly >> >> >> >>>> >> catching an interrupt exception. >> >> >> >>>> >> >> >> >> >>>> >> Can you take a thread dump? My guess is that you'll see >>that >> >>the >> >> >> >>>>main >> >> >> >>>> >> thread is blocked on BrokerProxy.stop: >> >> >> >>>> >> >> >> >> >>>> >> thread.interrupt >> >> >> >>>> >> thread.join >> >> >> >>>> >> >> >> >> >>>> >> >> >> >> >>>> >> It'll likely be blocked on thread.join. If that's the >>case, >> >>I'd >> >> >> >>>>like >> >> >> >>>>to >> >> >> >>>> >> see what the BrokerProxy thread is doing. This line >>indicates >> >> >>that >> >> >> >>>>the >> >> >> >>>> >> thread seems to be shutting down: >> >> >> >>>> >> >> >> >> >>>> >> [DEBUG] [2015-01-14 17:19:12,596] >> >> >>[SAMZA-BROKER-PROXY-BrokerProxy >> >> >> >>>>thread >> >> >> >>>> >> pointed at localhost:9192 for client >> >> >> >>>> >> >>samza_consumer-twitter_message_separation-1-1421255914862-1] >> >> >> >>>> >> o.a.s.s.k.DefaultFetchSimpleConsumer: Disconnecting from >> >> >> >>>>localhost:9192 >> >> >> >>>> >> >> >> >> >>>> >> But if thread.join is blocking, then the shutdown never >> >>seems to >> >> >> >>>> >>complete. >> >> >> >>>> >> It'd be good to see where these threads are at. >> >> >> >>>> >> >> >> >> >>>> >> >> >> >> >>>> >> Cheers, >> >> >> >>>> >> Chris >> >> >> >>>> >> >> >> >> >>>> >> On 1/14/15 10:49 AM, "Zach Cox" <[email protected]> wrote: >> >> >> >>>> >> >> >> >> >>>> >> >Hi - related to the discussion yesterday about graceful >> >> >>shutdown >> >> >> >>>>[1], >> >> >> >>>> >> >today >> >> >> >>>> >> >I can't seem to get the SamzaContainer to actually shut >> >>down. >> >> >> >>>> >>Yesterday I >> >> >> >>>> >> >was seeing nice, fast, complete shutdown logs [2]. >>However, >> >> >>today >> >> >> >>>>the >> >> >> >>>> >>last >> >> >> >>>> >> >log line I see is related to shutting down BrokerProxy, >>then >> >> >> >>>>nothing >> >> >> >>>> >>else >> >> >> >>>> >> >happens until the container is finally just SIGKILLed by >> >>Docker >> >> >> >>>>[3]. >> >> >> >>>> >> > >> >> >> >>>> >> >The consistency with which clean, fast shutdowns were >> >>happening >> >> >> >>>> >>yesterday >> >> >> >>>> >> >and now they are never happening today leads me to >>believe >> >>I've >> >> >> >>>>changed >> >> >> >>>> >> >something, but I really can't find anything that would >>lead >> >>to >> >> >> >>>>this. >> >> >> >>>> >>It's >> >> >> >>>> >> >almost like there's some deadlock in BrokerProxy, but I >> >>would >> >> >> >>>>think >> >> >> >>>> >>that >> >> >> >>>> >> >would show up randomly - I've tried this about 100 times >> >>today, >> >> >> >>>>and >> >> >> >>>> >>every >> >> >> >>>> >> >time the logs are as shown in [3]. Today I also see 2 >> >> >>exceptions >> >> >> >>>>in >> >> >> >>>>the >> >> >> >>>> >> >Kafka broker logs [4] but I don't know if those were >> >>occurring >> >> >> >>>> >>yesterday >> >> >> >>>> >> >when Samza was cleanly shutting down, or not. >> >> >> >>>> >> > >> >> >> >>>> >> >One thing that doesn't seem to be happening today is this >> >>log >> >> >>line >> >> >> >>>>from >> >> >> >>>> >> >BrokerProxy: "Got interrupt exception in broker proxy >> >>thread." >> >> >> >>>> >> > >> >> >> >>>> >> >Has anyone seen this, or have any advice on what to try >> >>next? >> >> >> >>>> >> > >> >> >> >>>> >> >Thanks, >> >> >> >>>> >> >Zach >> >> >> >>>> >> > >> >> >> >>>> >> > >> >> >> >>>> >> >[1] >> >> >> >>>> >> >> >> >> >>>> >>> >> >> >> >>>> >> >> >> >>>> >> >> >> >> >>>>http://www.mail-archive.com/[email protected]/msg02246.htm >> >> >> >>>>l >> >> >> >>>> >> > >> >> >> >>>> >> >[2] >> >> >> >>>> >> > >> >> >> >>>> >> >> >> >> >>>> >> >> >> >> >>>> >> >> >> >>>> >> >> >> >> >>>>https://gist.githubusercontent.com/zcox/6ec8910bd3f18e36c1a2/raw/666eae >> >> >> >>>>2 >> >> >> >>>>4 >> >> >> >>>> >>5 >> >> >> >>>> >> >11490bf51a66e56fd90c794ea6b6282/stdout >> >> >> >>>> >> > >> >> >> >>>> >> >[3] >> >> >> >>>> >> > >> >> >> >>>> >> >> >> >> >>>> >> >> >> >> >>>> >> >> >> >>>> >> >> >> >> >>>>https://gist.githubusercontent.com/zcox/673a2ba607c566de7650/raw/d29ea8 >> >> >> >>>>b >> >> >> >>>>9 >> >> >> >>>> >>9 >> >> >> >>>> >> >868da9cc4dc1db198315ee1d03bc694/gistfile1.txt >> >> >> >>>> >> > >> >> >> >>>> >> >[4] >> >> >> >>>> >> > >> >> >> >>>> >> >> >> >> >>>> >> >> >> >> >>>> >> >> >> >>>> >> >> >> >> >>>>https://gist.githubusercontent.com/zcox/f08de55c5d5fe2d70cde/raw/c4be45 >> >> >> >>>>3 >> >> >> >>>>b >> >> >> >>>> >>5 >> >> >> >>>> >> >f6a57cfb5acb08508667f4b8ce8c2bd/gistfile1.txt >> >> >> >>>> >> >> >> >> >>>> >> >> >> >> >>>> >> >> >> >>>> >> >> >> >> >> >> >> > >> >> >> >> >> >> >> >> >> >> >> >>
