Hi Chris, Here is the jira: https://issues.apache.org/jira/browse/SAMZA-513
Also here is the final working shutdown hook code and example logs based on your workaround: https://gist.github.com/zcox/f80fa8957c4964e8ebc0 I am totally OK with somewhat horrific hacks in this shutdown code. :) Thanks again for all of your help! -Zach On Thu, Jan 15, 2015 at 5:45 PM, Chris Riccomini < [email protected]> wrote: > Hey Zach, > > Awesome! Could you open a Samza JIRA with a brief description, and point > to this mailing list conversation? > > > I'm assuming this will be fixed in 0.9.0 - any targets yet for when that > >release might be out? Just trying to plan the best way to get this fix. > > A Kafka dev is looking at the problem right now, and agrees with me that > it looks like SimpleConsumer can swallow these exceptions. He's going to > 1) try to reproduce, and 2) open a Kafka JIRA if he can. The problem is, > since the fix will have to be in Kafka, not Samza, it could take some > time. Kafka is going through an 0.8.2 release vote right now, which means > a fix wouldn't be released until 0.8.3, which could be a few months out. > > I'd plan for probably a quarter, at least, before a fix is ready. :( > Sorry! Having to run with a forked Samza codebase isn't really desirable. > An alternative, somewhat horrific hack, would be to modify your shutdown > hook code to: > > 1. Get a list of all threads [1]. > 2. Iterate over threads looking for Thread.getName() with the > BROKER_PROXY_THREAD_NAME_PREFIX ("BROKER-PROXY-") > 3. Interrupt these threads in a tight loop. > > Cheers, > Chris > > [1] > http://stackoverflow.com/questions/1323408/get-a-list-of-all-threads-curren > tly-running-in-java > > On 1/15/15 2:19 PM, "Zach Cox" <[email protected]> wrote: > > >Hi Chris - I managed to get Samza 0.8.0 building locally with this change > >in BrokerProxy: > > > > def stop { > > info("Shutting down " + toString) > > > > var count = 0 > > while(thread.isAlive) { > > thread.interrupt > > count += 1 > > } > > info(s"Took $count interrupts") > > thread.join > > } > > > >So far I've tried 5 times, and shutdowns worked every time! Here is an > >example of logs I see now: > > > > > https://gist.githubusercontent.com/zcox/2c638b2d204b61dcce35/raw/9380499a1 > >92db7ba0d0bdb0c2330e6354730f8c8/gistfile1.txt > > > >The number of interrupts in each of the 5 runs were: 345787, 2275, 1699, > >47574 > >and 4996. > > > >Seems like that was the problem, since it takes so many interrupts before > >isAlive returns false. > > > >I'm assuming this will be fixed in 0.9.0 - any targets yet for when that > >release might be out? Just trying to plan the best way to get this fix. > > > >Thanks, > >Zach > > > > > >On Thu, Jan 15, 2015 at 2:31 PM, Chris Riccomini < > >[email protected]> wrote: > > > >> Hey Zach, > >> > >> Cool. I've engaged with some devs to understand if SimpleConsumer might > >>be > >> swallowing the interrupt. > >> > >> Cheers, > >> Chris > >> > >> On 1/15/15 12:29 PM, "Zach Cox" <[email protected]> wrote: > >> > >> >Hi Chris, > >> > > >> >Here is an example of logs when I see proper shutdown: > >> > > >> > > >> > >> > https://gist.githubusercontent.com/zcox/6ec8910bd3f18e36c1a2/raw/666eae24 > >>5 > >> >11490bf51a66e56fd90c794ea6b6282/stdout > >> > > >> >That does show the InterruptedException "Got interrupt exception in > >>broker > >> >proxy thread." line. > >> > > >> >Here's java -version output on the Samza container: > >> > > >> >java version "1.6.0_33" OpenJDK Runtime Environment (IcedTea6 1.13.5) > >> >(6b33-1.13.5-1ubuntu0.14.04) OpenJDK 64-Bit Server VM (build 23.25-b01, > >> >mixed mode) > >> > > >> >Currently all of this stuff (Mesos, Kafka, Samza, etc) is running on my > >> >laptop. > >> > > >> >I've just been using the released Samza 0.8.0 jars from maven central - > >> >will try to get Samza building locally and add that tight interrupt > >>loop, > >> >and see what happens. > >> > > >> >Thanks, > >> >Zach > >> > > >> > > >> >On Thu, Jan 15, 2015 at 1:11 PM, Chris Riccomini < > >> >[email protected]> wrote: > >> > > >> >> Hey Zach, > >> >> > >> >> Can you try modifying this line in BrokerProxy: > >> >> > >> >> > >> >> > >> >> > >> > >> > https://github.com/apache/incubator-samza/blob/0.8.0/samza-kafka/src/main > >> >>/s > >> >> cala/org/apache/samza/system/kafka/BrokerProxy.scala#L283 > >> >> > >> >> To interrupt a lot in a tight loop and continue checking the thread > >> >> status? Something like: > >> >> > >> >> while(thread.isAlive) { > >> >> thread.interrupt > >> >> } > >> >> > >> >> I want to see if we can catch the thread at a point when it's > >>sleeping, > >> >> not doing IO on the wire. If we can, the thread should shutdown > >>cleanly, > >> >> and we should see the "Got interrupt exception in broker proxy > >>thread." > >> >> exception. > >> >> > >> >> Cheers, > >> >> Chris > >> >> > >> >> On 1/15/15 11:01 AM, "Chris Riccomini" <[email protected]> > >>wrote: > >> >> > >> >> >Hey Zach, > >> >> > > >> >> >I'm beginning to wonder if this line is the problem: > >> >> > > >> >> > > >> >> > > >> >> > >> >> > >> > >> > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/cons > >> >>u > >> >> >m > >> >> >er/SimpleConsumer.scala#L75 > >> >> > > >> >> >If the sendRequest in SimpleConsumer catches a Throwable, it will > >>catch > >> >> >ClosedByInterruptException. This will cause the interrupt flag to > >>get > >> >> >reset, I think. If the consumer the retries the send, and it > >>succeeds, > >> >> >then I think it will continue on and ignore the interrupt. Going to > >> >>poke a > >> >> >Kafka dev to see if I am misunderstanding something. > >> >> > > >> >> >Cheers, > >> >> >Chris > >> >> > > >> >> >On 1/15/15 10:52 AM, "Chris Riccomini" <[email protected]> > >> wrote: > >> >> > > >> >> >>Hey Zach, > >> >> >> > >> >> >>Hmm. It sounds like something is catching the InterruptedException, > >> >>and > >> >> >>letting the interrupt flag get reset. If that happens, the > >> >> >>ExponentialSleepStrategy [1] will continue running. > >> >> >> > >> >> >>I walked through the code, but couldn't find where it might be > >>doing > >> >>so, > >> >> >>but it's hard to catch these without a test. > >> >> >> > >> >> >>> I have never seen that "Restarting consumer due to ..." warn > >>line in > >> >> >>>the > >> >> >>>logs. > >> >> >> > >> >> >>If that's the case, then I don't think the BrokerProxy can be > >> >> >>reconnecting > >> >> >>since reconnect must be set to true, which would require seeing > >>that > >> >>log > >> >> >>line [2]. > >> >> >> > >> >> >>Out of curiosity, what version of the JVM are you using? Trying to > >> >> >>replicate this is proving tricky. > >> >> >> > >> >> >>I'm beginning to wonder if this has to do with InterruptedException > >> >>vs. > >> >> >>ClosedByInterruptException. In TestStatefulTask, I only see this > >>the > >> >> >>first > >> >> >>log line: > >> >> >> > >> >> >> case e: InterruptedException => info("Got interrupt exception in > >> >>broker > >> >> >>proxy thread.") > >> >> >> case e: ClosedByInterruptException => info("Got closed by > >>interrupt > >> >> >>exception in broker proxy thread.") > >> >> >> > >> >> >>This is probably because the test connects to a local Kafka > >>broker, so > >> >> >>nio > >> >> >>socket operations are extremely fast, and the test almost always > >>gets > >> >> >>interrupted on a Thread.sleep, not a blocking socket operation. I'm > >> >> >>wondering if a ClosedByInterruptException exception would cause the > >> >> >>hanging that you're seeing. > >> >> >> > >> >> >> > >> >> >>Out of curiosity, is your container on the same machine as your > >> >>broker, > >> >> >>or > >> >> >>a different machine? > >> >> >> > >> >> >>Cheers, > >> >> >>Chris > >> >> >> > >> >> >>[1] > >> >> >> > >> >> > >> >> > >> > >> > https://github.com/apache/incubator-samza/blob/0.8.0/samza-core/src/main/ > >> >> >>s > >> >> >>c > >> >> >>ala/org/apache/samza/util/ExponentialSleepStrategy.scala#L80 > >> >> >>[2] > >> >> >> > >> >> > >> >> > >> > >> > https://github.com/apache/incubator-samza/blob/0.8.0/samza-kafka/src/main > >> >> >>/ > >> >> >>s > >> >> >>cala/org/apache/samza/system/kafka/BrokerProxy.scala#L133 > >> >> >> > >> >> >>On 1/15/15 9:38 AM, "Zach Cox" <[email protected]> wrote: > >> >> >> > >> >> >>>Hi Chris, > >> >> >>> > >> >> >>> > >> >> >>>> Looking at your logs (from previous email, granted, different > >> >> >>>>execution), > >> >> >>>> I see that we get as far as SimpleConsumer.disconnect(): > >> >> >>>> > >> >> >>>> [DEBUG] [2015-01-14 17:19:12,596] > >>[SAMZA-BROKER-PROXY-BrokerProxy > >> >> >>>>...] > >> >> >>>> o.a.s.s.k.DefaultFetchSimpleConsumer: Disconnecting from > >> >> >>>>localhost:9192 > >> >> >>>> > >> >> >>>> This is invoked by BrokerProxy.scala:135. The only way this > >>should > >> >>get > >> >> >>>> invoked is if an exception is caught. Do you see this log line > >> >> >>>>anywhere? > >> >> >>>> > >> >> >>>> warn("Restarting consumer due to %s. Turn on debugging to get > >>a > >> >>full > >> >> >>>> stack trace." format exception) > >> >> >>>> > >> >> >>> > >> >> >>>DefaultFetchConsumer.close() calls SimpleConsumer.close(), which > >> >>calls > >> >> >>>disconnect(), which also logs that "Disconnecting from > >> >>localhost:9192" > >> >> >>>line: > >> >> >>> > >> >> >>> > >> >> > >> > https://github.com/apache/incubator-samza/blob/master/samza-kafka/src/ma > >> >> >>>i > >> >> >>>n > >> >> > >> > >>>>>>>/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scal > >>>>>>>a# > >> >>>>>L4 > >> >> >>>4 > >> >> >>> > >> >> >>> > >> >> > >> > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/con > >> >> >>>s > >> >> >>>u > >> >> >>>mer/SimpleConsumer.scala#L50 > >> >> >>> > >> >> >>>I have never seen that "Restarting consumer due to ..." warn line > >>in > >> >>the > >> >> >>>logs. > >> >> >>> > >> >> >>>Also note that I'm using Samza 0.8.0, not master. > >> >> >>> > >> >> >>> > >> >> >>>> I ran our TestStatefulTask test a few times, and verified that > >>the > >> >> >>>> consumer CAN be shutdown: > >> >> >>>> > >> >> >>>> [ThreadJob] INFO org.apache.samza.container.SamzaContainer - > >> >> >>>>Shutting > >> >> >>>> down consumer multiplexer. > >> >> >>>> [ThreadJob] INFO org.apache.samza.system.kafka.BrokerProxy - > >> >> >>>>Shutting > >> >> >>>> down BrokerProxy for localhost:59075 > >> >> >>>> 2015-01-15 08:59:26 DefaultFetchSimpleConsumer [WARN] > >>Reconnect > >> >>due > >> >> >>>>to > >> >> >>>> socket error: null > >> >> >>>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at ...] INFO > >> >> >>>> org.apache.samza.system.kafka.BrokerProxy - Got closed by > >>interrupt > >> >> >>>> exception in broker proxy thread. > >> >> >>>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at ...] INFO > >> >> >>>> org.apache.samza.system.kafka.BrokerProxy - Shutting down due to > >> >> >>>>interrupt. > >> >> >>>> // consumer shutdown is complete here.. The container has > >>moved > >> >>on > >> >> >>>>to > >> >> >>>> shutting down the producer > >> >> >>>> [ThreadJob] INFO org.apache.samza.container.SamzaContainer - > >> >> >>>>Shutting > >> >> >>>> down producer multiplexer. > >> >> >>>> > >> >> >>> > >> >> >>>Yesterday afternoon I was showing this to a colleague, and we did > >> >> >>>observe > >> >> >>>a > >> >> >>>proper clean shutdown once. Then the next 2 times I showed him we > >>saw > >> >> >>>the > >> >> >>>BrokerProxy blocking issue. So I also observe proper shutdown > >> >>sometimes > >> >> >>>too, just seems a lot more often that BrokerProxy blocks and it > >> >>doesn't > >> >> >>>shut down. > >> >> >>> > >> >> >>>Thanks, > >> >> >>>Zach > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>>> > >> >> >>>> > >> >> >>>> Cheers, > >> >> >>>> Chris > >> >> >>>> > >> >> >>>> On 1/14/15 7:24 PM, "Zach Cox" <[email protected]> wrote: > >> >> >>>> > >> >> >>>> >Hi Chris, > >> >> >>>> > > >> >> >>>> >I did a thread dump during the shutdown process, when I could > >> >>tell it > >> >> >>>> >wasn't shutting down properly [1]. You can see this thread > >>dump in > >> >> >>>>the > >> >> >>>> >context of the other logging at shutdown. > >> >> >>>> > > >> >> >>>> >The main thread is indeed blocked on the Thread.join call in > >> >> >>>> >BrokerProxy.stop. The BrokerProxy thread looks like it's still > >> >> >>>>consuming > >> >> >>>> >from Kafka? > >> >> >>>> > > >> >> >>>> >The thread dump was a bit tricky; this Samza container is > >>running > >> >>in > >> >> >>>>a > >> >> >>>> >Docker container on boot2docker, which is Tiny Core Linux, > >>i.e. no > >> >> >>>>jstack. > >> >> >>>> >I never new this, but `sudo kill -3 [pid]` tells the jvm to do > >>a > >> >> >>>>thread > >> >> >>>> >dump to stdout [2] :) > >> >> >>>> > > >> >> >>>> >Thanks, > >> >> >>>> >Zach > >> >> >>>> > > >> >> >>>> >[1] > >> >> >>>> > > >> >> >>>> > >> >> >>>> > >> >> > >>https://gist.githubusercontent.com/zcox/bb47a61d4ae1acd54056/raw/83224d > >> >> >>>>2 > >> >> >>>>0 > >> >> >>>>a > >> >> >>>> >61de20bb47bcf916d92930a71cd97ad/gistfile1.txt > >> >> >>>> > > >> >> >>>> >[2] > >> >> >>>> > > >> >> >>>> > >> >> >>>> > >> >> > >>http://serverfault.com/questions/574745/jstack-alternative-for-linux-re > >> >> >>>>d > >> >> >>>>h > >> >> >>>>a > >> >> >>>> >t-6 > >> >> >>>> > > >> >> >>>> > > >> >> >>>> >On Wed, Jan 14, 2015 at 7:30 PM, Chris Riccomini < > >> >> >>>> >[email protected]> wrote: > >> >> >>>> > > >> >> >>>> >> Hey Zach, > >> >> >>>> >> > >> >> >>>> >> It sounds likely that the BrokerProxy thread is blocking or > >> >> >>>>improperly > >> >> >>>> >> catching an interrupt exception. > >> >> >>>> >> > >> >> >>>> >> Can you take a thread dump? My guess is that you'll see that > >>the > >> >> >>>>main > >> >> >>>> >> thread is blocked on BrokerProxy.stop: > >> >> >>>> >> > >> >> >>>> >> thread.interrupt > >> >> >>>> >> thread.join > >> >> >>>> >> > >> >> >>>> >> > >> >> >>>> >> It'll likely be blocked on thread.join. If that's the case, > >>I'd > >> >> >>>>like > >> >> >>>>to > >> >> >>>> >> see what the BrokerProxy thread is doing. This line indicates > >> >>that > >> >> >>>>the > >> >> >>>> >> thread seems to be shutting down: > >> >> >>>> >> > >> >> >>>> >> [DEBUG] [2015-01-14 17:19:12,596] > >> >>[SAMZA-BROKER-PROXY-BrokerProxy > >> >> >>>>thread > >> >> >>>> >> pointed at localhost:9192 for client > >> >> >>>> >> samza_consumer-twitter_message_separation-1-1421255914862-1] > >> >> >>>> >> o.a.s.s.k.DefaultFetchSimpleConsumer: Disconnecting from > >> >> >>>>localhost:9192 > >> >> >>>> >> > >> >> >>>> >> But if thread.join is blocking, then the shutdown never > >>seems to > >> >> >>>> >>complete. > >> >> >>>> >> It'd be good to see where these threads are at. > >> >> >>>> >> > >> >> >>>> >> > >> >> >>>> >> Cheers, > >> >> >>>> >> Chris > >> >> >>>> >> > >> >> >>>> >> On 1/14/15 10:49 AM, "Zach Cox" <[email protected]> wrote: > >> >> >>>> >> > >> >> >>>> >> >Hi - related to the discussion yesterday about graceful > >> >>shutdown > >> >> >>>>[1], > >> >> >>>> >> >today > >> >> >>>> >> >I can't seem to get the SamzaContainer to actually shut > >>down. > >> >> >>>> >>Yesterday I > >> >> >>>> >> >was seeing nice, fast, complete shutdown logs [2]. However, > >> >>today > >> >> >>>>the > >> >> >>>> >>last > >> >> >>>> >> >log line I see is related to shutting down BrokerProxy, then > >> >> >>>>nothing > >> >> >>>> >>else > >> >> >>>> >> >happens until the container is finally just SIGKILLed by > >>Docker > >> >> >>>>[3]. > >> >> >>>> >> > > >> >> >>>> >> >The consistency with which clean, fast shutdowns were > >>happening > >> >> >>>> >>yesterday > >> >> >>>> >> >and now they are never happening today leads me to believe > >>I've > >> >> >>>>changed > >> >> >>>> >> >something, but I really can't find anything that would lead > >>to > >> >> >>>>this. > >> >> >>>> >>It's > >> >> >>>> >> >almost like there's some deadlock in BrokerProxy, but I > >>would > >> >> >>>>think > >> >> >>>> >>that > >> >> >>>> >> >would show up randomly - I've tried this about 100 times > >>today, > >> >> >>>>and > >> >> >>>> >>every > >> >> >>>> >> >time the logs are as shown in [3]. Today I also see 2 > >> >>exceptions > >> >> >>>>in > >> >> >>>>the > >> >> >>>> >> >Kafka broker logs [4] but I don't know if those were > >>occurring > >> >> >>>> >>yesterday > >> >> >>>> >> >when Samza was cleanly shutting down, or not. > >> >> >>>> >> > > >> >> >>>> >> >One thing that doesn't seem to be happening today is this > >>log > >> >>line > >> >> >>>>from > >> >> >>>> >> >BrokerProxy: "Got interrupt exception in broker proxy > >>thread." > >> >> >>>> >> > > >> >> >>>> >> >Has anyone seen this, or have any advice on what to try > >>next? > >> >> >>>> >> > > >> >> >>>> >> >Thanks, > >> >> >>>> >> >Zach > >> >> >>>> >> > > >> >> >>>> >> > > >> >> >>>> >> >[1] > >> >> >>>> >> > >> >> >>>> >>> > >> >> >>>> > >> >> >>>> > >> >> > >>http://www.mail-archive.com/[email protected]/msg02246.htm > >> >> >>>>l > >> >> >>>> >> > > >> >> >>>> >> >[2] > >> >> >>>> >> > > >> >> >>>> >> > >> >> >>>> >> > >> >> >>>> > >> >> >>>> > >> >> > >>https://gist.githubusercontent.com/zcox/6ec8910bd3f18e36c1a2/raw/666eae > >> >> >>>>2 > >> >> >>>>4 > >> >> >>>> >>5 > >> >> >>>> >> >11490bf51a66e56fd90c794ea6b6282/stdout > >> >> >>>> >> > > >> >> >>>> >> >[3] > >> >> >>>> >> > > >> >> >>>> >> > >> >> >>>> >> > >> >> >>>> > >> >> >>>> > >> >> > >>https://gist.githubusercontent.com/zcox/673a2ba607c566de7650/raw/d29ea8 > >> >> >>>>b > >> >> >>>>9 > >> >> >>>> >>9 > >> >> >>>> >> >868da9cc4dc1db198315ee1d03bc694/gistfile1.txt > >> >> >>>> >> > > >> >> >>>> >> >[4] > >> >> >>>> >> > > >> >> >>>> >> > >> >> >>>> >> > >> >> >>>> > >> >> >>>> > >> >> > >>https://gist.githubusercontent.com/zcox/f08de55c5d5fe2d70cde/raw/c4be45 > >> >> >>>>3 > >> >> >>>>b > >> >> >>>> >>5 > >> >> >>>> >> >f6a57cfb5acb08508667f4b8ce8c2bd/gistfile1.txt > >> >> >>>> >> > >> >> >>>> >> > >> >> >>>> > >> >> >>>> > >> >> >> > >> >> > > >> >> > >> >> > >> > >> > >
