Hi Chris,

Here is the jira: https://issues.apache.org/jira/browse/SAMZA-513

Also here is the final working shutdown hook code and example logs based on
your workaround: https://gist.github.com/zcox/f80fa8957c4964e8ebc0

I am totally OK with somewhat horrific hacks in this shutdown code. :)

Thanks again for all of your help!
-Zach


On Thu, Jan 15, 2015 at 5:45 PM, Chris Riccomini <
[email protected]> wrote:

> Hey Zach,
>
> Awesome! Could you open a Samza JIRA with a brief description, and point
> to this mailing list conversation?
>
> > I'm assuming this will be fixed in 0.9.0 - any targets yet for when that
> >release might be out? Just trying to plan the best way to get this fix.
>
> A Kafka dev is looking at the problem right now, and agrees with me that
> it looks like SimpleConsumer can swallow these exceptions. He's going to
> 1) try to reproduce, and 2) open a Kafka JIRA if he can. The problem is,
> since the fix will have to be in Kafka, not Samza, it could take some
> time. Kafka is going through an 0.8.2 release vote right now, which means
> a fix wouldn't be released until 0.8.3, which could be a few months out.
>
> I'd plan for probably a quarter, at least, before a fix is ready. :(
> Sorry! Having to run with a forked Samza codebase isn't really desirable.
> An alternative, somewhat horrific hack, would be to modify your shutdown
> hook code to:
>
>   1. Get a list of all threads [1].
>   2. Iterate over threads looking for Thread.getName() with the
> BROKER_PROXY_THREAD_NAME_PREFIX ("BROKER-PROXY-")
>   3. Interrupt these threads in a tight loop.
>
> Cheers,
> Chris
>
> [1]
> http://stackoverflow.com/questions/1323408/get-a-list-of-all-threads-curren
> tly-running-in-java
>
> On 1/15/15 2:19 PM, "Zach Cox" <[email protected]> wrote:
>
> >Hi Chris - I managed to get Samza 0.8.0 building locally with this change
> >in BrokerProxy:
> >
> >  def stop {
> >    info("Shutting down " + toString)
> >
> >    var count = 0
> >    while(thread.isAlive) {
> >      thread.interrupt
> >      count += 1
> >    }
> >    info(s"Took $count interrupts")
> >    thread.join
> >  }
> >
> >So far I've tried 5 times, and shutdowns worked every time! Here is an
> >example of logs I see now:
> >
> >
> https://gist.githubusercontent.com/zcox/2c638b2d204b61dcce35/raw/9380499a1
> >92db7ba0d0bdb0c2330e6354730f8c8/gistfile1.txt
> >
> >The number of interrupts in each of the 5 runs were: 345787, 2275, 1699,
> >47574
> >and 4996.
> >
> >Seems like that was the problem, since it takes so many interrupts before
> >isAlive returns false.
> >
> >I'm assuming this will be fixed in 0.9.0 - any targets yet for when that
> >release might be out? Just trying to plan the best way to get this fix.
> >
> >Thanks,
> >Zach
> >
> >
> >On Thu, Jan 15, 2015 at 2:31 PM, Chris Riccomini <
> >[email protected]> wrote:
> >
> >> Hey Zach,
> >>
> >> Cool. I've engaged with some devs to understand if SimpleConsumer might
> >>be
> >> swallowing the interrupt.
> >>
> >> Cheers,
> >> Chris
> >>
> >> On 1/15/15 12:29 PM, "Zach Cox" <[email protected]> wrote:
> >>
> >> >Hi Chris,
> >> >
> >> >Here is an example of logs when I see proper shutdown:
> >> >
> >> >
> >>
> >>
> https://gist.githubusercontent.com/zcox/6ec8910bd3f18e36c1a2/raw/666eae24
> >>5
> >> >11490bf51a66e56fd90c794ea6b6282/stdout
> >> >
> >> >That does show the InterruptedException "Got interrupt exception in
> >>broker
> >> >proxy thread." line.
> >> >
> >> >Here's java -version output on the Samza container:
> >> >
> >> >java version "1.6.0_33" OpenJDK Runtime Environment (IcedTea6 1.13.5)
> >> >(6b33-1.13.5-1ubuntu0.14.04) OpenJDK 64-Bit Server VM (build 23.25-b01,
> >> >mixed mode)
> >> >
> >> >Currently all of this stuff (Mesos, Kafka, Samza, etc) is running on my
> >> >laptop.
> >> >
> >> >I've just been using the released Samza 0.8.0 jars from maven central -
> >> >will try to get Samza building locally and add that tight interrupt
> >>loop,
> >> >and see what happens.
> >> >
> >> >Thanks,
> >> >Zach
> >> >
> >> >
> >> >On Thu, Jan 15, 2015 at 1:11 PM, Chris Riccomini <
> >> >[email protected]> wrote:
> >> >
> >> >> Hey Zach,
> >> >>
> >> >> Can you try modifying this line in BrokerProxy:
> >> >>
> >> >>
> >> >>
> >> >>
> >>
> >>
> https://github.com/apache/incubator-samza/blob/0.8.0/samza-kafka/src/main
> >> >>/s
> >> >> cala/org/apache/samza/system/kafka/BrokerProxy.scala#L283
> >> >>
> >> >> To interrupt a lot in a tight loop and continue checking the thread
> >> >> status? Something like:
> >> >>
> >> >>   while(thread.isAlive) {
> >> >>     thread.interrupt
> >> >>   }
> >> >>
> >> >> I want to see if we can catch the thread at a point when it's
> >>sleeping,
> >> >> not doing IO on the wire. If we can, the thread should shutdown
> >>cleanly,
> >> >> and we should see the "Got interrupt exception in broker proxy
> >>thread."
> >> >> exception.
> >> >>
> >> >> Cheers,
> >> >> Chris
> >> >>
> >> >> On 1/15/15 11:01 AM, "Chris Riccomini" <[email protected]>
> >>wrote:
> >> >>
> >> >> >Hey Zach,
> >> >> >
> >> >> >I'm beginning to wonder if this line is the problem:
> >> >> >
> >> >> >
> >> >> >
> >> >>
> >> >>
> >>
> >>
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/cons
> >> >>u
> >> >> >m
> >> >> >er/SimpleConsumer.scala#L75
> >> >> >
> >> >> >If the sendRequest in SimpleConsumer catches a Throwable, it will
> >>catch
> >> >> >ClosedByInterruptException. This will cause the interrupt flag to
> >>get
> >> >> >reset, I think. If the consumer the retries the send, and it
> >>succeeds,
> >> >> >then I think it will continue on and ignore the interrupt. Going to
> >> >>poke a
> >> >> >Kafka dev to see if I am misunderstanding something.
> >> >> >
> >> >> >Cheers,
> >> >> >Chris
> >> >> >
> >> >> >On 1/15/15 10:52 AM, "Chris Riccomini" <[email protected]>
> >> wrote:
> >> >> >
> >> >> >>Hey Zach,
> >> >> >>
> >> >> >>Hmm. It sounds like something is catching the InterruptedException,
> >> >>and
> >> >> >>letting the interrupt flag get reset. If that happens, the
> >> >> >>ExponentialSleepStrategy [1] will continue running.
> >> >> >>
> >> >> >>I walked through the code, but couldn't find where it might be
> >>doing
> >> >>so,
> >> >> >>but it's hard to catch these without a test.
> >> >> >>
> >> >> >>> I have never seen that "Restarting consumer due to ..." warn
> >>line in
> >> >> >>>the
> >> >> >>>logs.
> >> >> >>
> >> >> >>If that's the case, then I don't think the BrokerProxy can be
> >> >> >>reconnecting
> >> >> >>since reconnect must be set to true, which would require seeing
> >>that
> >> >>log
> >> >> >>line [2].
> >> >> >>
> >> >> >>Out of curiosity, what version of the JVM are you using? Trying to
> >> >> >>replicate this is proving tricky.
> >> >> >>
> >> >> >>I'm beginning to wonder if this has to do with InterruptedException
> >> >>vs.
> >> >> >>ClosedByInterruptException. In TestStatefulTask, I only see this
> >>the
> >> >> >>first
> >> >> >>log line:
> >> >> >>
> >> >> >>  case e: InterruptedException => info("Got interrupt exception in
> >> >>broker
> >> >> >>proxy thread.")
> >> >> >>  case e: ClosedByInterruptException => info("Got closed by
> >>interrupt
> >> >> >>exception in broker proxy thread.")
> >> >> >>
> >> >> >>This is probably because the test connects to a local Kafka
> >>broker, so
> >> >> >>nio
> >> >> >>socket operations are extremely fast, and the test almost always
> >>gets
> >> >> >>interrupted on a Thread.sleep, not a blocking socket operation. I'm
> >> >> >>wondering if a ClosedByInterruptException exception would cause the
> >> >> >>hanging that you're seeing.
> >> >> >>
> >> >> >>
> >> >> >>Out of curiosity, is your container on the same machine as your
> >> >>broker,
> >> >> >>or
> >> >> >>a different machine?
> >> >> >>
> >> >> >>Cheers,
> >> >> >>Chris
> >> >> >>
> >> >> >>[1]
> >> >> >>
> >> >>
> >> >>
> >>
> >>
> https://github.com/apache/incubator-samza/blob/0.8.0/samza-core/src/main/
> >> >> >>s
> >> >> >>c
> >> >> >>ala/org/apache/samza/util/ExponentialSleepStrategy.scala#L80
> >> >> >>[2]
> >> >> >>
> >> >>
> >> >>
> >>
> >>
> https://github.com/apache/incubator-samza/blob/0.8.0/samza-kafka/src/main
> >> >> >>/
> >> >> >>s
> >> >> >>cala/org/apache/samza/system/kafka/BrokerProxy.scala#L133
> >> >> >>
> >> >> >>On 1/15/15 9:38 AM, "Zach Cox" <[email protected]> wrote:
> >> >> >>
> >> >> >>>Hi Chris,
> >> >> >>>
> >> >> >>>
> >> >> >>>> Looking at your logs (from previous email, granted, different
> >> >> >>>>execution),
> >> >> >>>> I see that we get as far as SimpleConsumer.disconnect():
> >> >> >>>>
> >> >> >>>>   [DEBUG] [2015-01-14 17:19:12,596]
> >>[SAMZA-BROKER-PROXY-BrokerProxy
> >> >> >>>>...]
> >> >> >>>> o.a.s.s.k.DefaultFetchSimpleConsumer: Disconnecting from
> >> >> >>>>localhost:9192
> >> >> >>>>
> >> >> >>>> This is invoked by BrokerProxy.scala:135. The only way this
> >>should
> >> >>get
> >> >> >>>> invoked is if an exception is caught. Do you see this log line
> >> >> >>>>anywhere?
> >> >> >>>>
> >> >> >>>>   warn("Restarting consumer due to %s. Turn on debugging to get
> >>a
> >> >>full
> >> >> >>>> stack trace." format exception)
> >> >> >>>>
> >> >> >>>
> >> >> >>>DefaultFetchConsumer.close() calls SimpleConsumer.close(), which
> >> >>calls
> >> >> >>>disconnect(), which also logs that "Disconnecting from
> >> >>localhost:9192"
> >> >> >>>line:
> >> >> >>>
> >> >> >>>
> >> >>
> >>
> https://github.com/apache/incubator-samza/blob/master/samza-kafka/src/ma
> >> >> >>>i
> >> >> >>>n
> >> >>
> >>
> >>>>>>>/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scal
> >>>>>>>a#
> >> >>>>>L4
> >> >> >>>4
> >> >> >>>
> >> >> >>>
> >> >>
> >>
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/con
> >> >> >>>s
> >> >> >>>u
> >> >> >>>mer/SimpleConsumer.scala#L50
> >> >> >>>
> >> >> >>>I have never seen that "Restarting consumer due to ..." warn line
> >>in
> >> >>the
> >> >> >>>logs.
> >> >> >>>
> >> >> >>>Also note that I'm using Samza 0.8.0, not master.
> >> >> >>>
> >> >> >>>
> >> >> >>>> I ran our TestStatefulTask test a few times, and verified that
> >>the
> >> >> >>>> consumer CAN be shutdown:
> >> >> >>>>
> >> >> >>>>   [ThreadJob] INFO org.apache.samza.container.SamzaContainer -
> >> >> >>>>Shutting
> >> >> >>>> down consumer multiplexer.
> >> >> >>>>   [ThreadJob] INFO org.apache.samza.system.kafka.BrokerProxy -
> >> >> >>>>Shutting
> >> >> >>>> down BrokerProxy for localhost:59075
> >> >> >>>>   2015-01-15 08:59:26 DefaultFetchSimpleConsumer [WARN]
> >>Reconnect
> >> >>due
> >> >> >>>>to
> >> >> >>>> socket error: null
> >> >> >>>>   [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at ...] INFO
> >> >> >>>> org.apache.samza.system.kafka.BrokerProxy - Got closed by
> >>interrupt
> >> >> >>>> exception in broker proxy thread.
> >> >> >>>>   [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at ...] INFO
> >> >> >>>> org.apache.samza.system.kafka.BrokerProxy - Shutting down due to
> >> >> >>>>interrupt.
> >> >> >>>>   // consumer shutdown is complete here.. The container has
> >>moved
> >> >>on
> >> >> >>>>to
> >> >> >>>> shutting down the producer
> >> >> >>>>   [ThreadJob] INFO org.apache.samza.container.SamzaContainer -
> >> >> >>>>Shutting
> >> >> >>>> down producer multiplexer.
> >> >> >>>>
> >> >> >>>
> >> >> >>>Yesterday afternoon I was showing this to a colleague, and we did
> >> >> >>>observe
> >> >> >>>a
> >> >> >>>proper clean shutdown once. Then the next 2 times I showed him we
> >>saw
> >> >> >>>the
> >> >> >>>BrokerProxy blocking issue. So I also observe proper shutdown
> >> >>sometimes
> >> >> >>>too, just seems a lot more often that BrokerProxy blocks and it
> >> >>doesn't
> >> >> >>>shut down.
> >> >> >>>
> >> >> >>>Thanks,
> >> >> >>>Zach
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>>>
> >> >> >>>>
> >> >> >>>> Cheers,
> >> >> >>>> Chris
> >> >> >>>>
> >> >> >>>> On 1/14/15 7:24 PM, "Zach Cox" <[email protected]> wrote:
> >> >> >>>>
> >> >> >>>> >Hi Chris,
> >> >> >>>> >
> >> >> >>>> >I did a thread dump during the shutdown process, when I could
> >> >>tell it
> >> >> >>>> >wasn't shutting down properly [1]. You can see this thread
> >>dump in
> >> >> >>>>the
> >> >> >>>> >context of the other logging at shutdown.
> >> >> >>>> >
> >> >> >>>> >The main thread is indeed blocked on the Thread.join call in
> >> >> >>>> >BrokerProxy.stop. The BrokerProxy thread looks like it's still
> >> >> >>>>consuming
> >> >> >>>> >from Kafka?
> >> >> >>>> >
> >> >> >>>> >The thread dump was a bit tricky; this Samza container is
> >>running
> >> >>in
> >> >> >>>>a
> >> >> >>>> >Docker container on boot2docker, which is Tiny Core Linux,
> >>i.e. no
> >> >> >>>>jstack.
> >> >> >>>> >I never new this, but `sudo kill -3 [pid]` tells the jvm to do
> >>a
> >> >> >>>>thread
> >> >> >>>> >dump to stdout [2] :)
> >> >> >>>> >
> >> >> >>>> >Thanks,
> >> >> >>>> >Zach
> >> >> >>>> >
> >> >> >>>> >[1]
> >> >> >>>> >
> >> >> >>>>
> >> >> >>>>
> >> >>
> >>https://gist.githubusercontent.com/zcox/bb47a61d4ae1acd54056/raw/83224d
> >> >> >>>>2
> >> >> >>>>0
> >> >> >>>>a
> >> >> >>>> >61de20bb47bcf916d92930a71cd97ad/gistfile1.txt
> >> >> >>>> >
> >> >> >>>> >[2]
> >> >> >>>> >
> >> >> >>>>
> >> >> >>>>
> >> >>
> >>http://serverfault.com/questions/574745/jstack-alternative-for-linux-re
> >> >> >>>>d
> >> >> >>>>h
> >> >> >>>>a
> >> >> >>>> >t-6
> >> >> >>>> >
> >> >> >>>> >
> >> >> >>>> >On Wed, Jan 14, 2015 at 7:30 PM, Chris Riccomini <
> >> >> >>>> >[email protected]> wrote:
> >> >> >>>> >
> >> >> >>>> >> Hey Zach,
> >> >> >>>> >>
> >> >> >>>> >> It sounds likely that the BrokerProxy thread is blocking or
> >> >> >>>>improperly
> >> >> >>>> >> catching an interrupt exception.
> >> >> >>>> >>
> >> >> >>>> >> Can you take a thread dump? My guess is that you'll see that
> >>the
> >> >> >>>>main
> >> >> >>>> >> thread is blocked on BrokerProxy.stop:
> >> >> >>>> >>
> >> >> >>>> >>   thread.interrupt
> >> >> >>>> >>   thread.join
> >> >> >>>> >>
> >> >> >>>> >>
> >> >> >>>> >> It'll likely be blocked on thread.join. If that's the case,
> >>I'd
> >> >> >>>>like
> >> >> >>>>to
> >> >> >>>> >> see what the BrokerProxy thread is doing. This line indicates
> >> >>that
> >> >> >>>>the
> >> >> >>>> >> thread seems to be shutting down:
> >> >> >>>> >>
> >> >> >>>> >> [DEBUG] [2015-01-14 17:19:12,596]
> >> >>[SAMZA-BROKER-PROXY-BrokerProxy
> >> >> >>>>thread
> >> >> >>>> >> pointed at localhost:9192 for client
> >> >> >>>> >> samza_consumer-twitter_message_separation-1-1421255914862-1]
> >> >> >>>> >> o.a.s.s.k.DefaultFetchSimpleConsumer: Disconnecting from
> >> >> >>>>localhost:9192
> >> >> >>>> >>
> >> >> >>>> >> But if thread.join is blocking, then the shutdown never
> >>seems to
> >> >> >>>> >>complete.
> >> >> >>>> >> It'd be good to see where these threads are at.
> >> >> >>>> >>
> >> >> >>>> >>
> >> >> >>>> >> Cheers,
> >> >> >>>> >> Chris
> >> >> >>>> >>
> >> >> >>>> >> On 1/14/15 10:49 AM, "Zach Cox" <[email protected]> wrote:
> >> >> >>>> >>
> >> >> >>>> >> >Hi - related to the discussion yesterday about graceful
> >> >>shutdown
> >> >> >>>>[1],
> >> >> >>>> >> >today
> >> >> >>>> >> >I can't seem to get the SamzaContainer to actually shut
> >>down.
> >> >> >>>> >>Yesterday I
> >> >> >>>> >> >was seeing nice, fast, complete shutdown logs [2]. However,
> >> >>today
> >> >> >>>>the
> >> >> >>>> >>last
> >> >> >>>> >> >log line I see is related to shutting down BrokerProxy, then
> >> >> >>>>nothing
> >> >> >>>> >>else
> >> >> >>>> >> >happens until the container is finally just SIGKILLed by
> >>Docker
> >> >> >>>>[3].
> >> >> >>>> >> >
> >> >> >>>> >> >The consistency with which clean, fast shutdowns were
> >>happening
> >> >> >>>> >>yesterday
> >> >> >>>> >> >and now they are never happening today leads me to believe
> >>I've
> >> >> >>>>changed
> >> >> >>>> >> >something, but I really can't find anything that would lead
> >>to
> >> >> >>>>this.
> >> >> >>>> >>It's
> >> >> >>>> >> >almost like there's some deadlock in BrokerProxy, but I
> >>would
> >> >> >>>>think
> >> >> >>>> >>that
> >> >> >>>> >> >would show up randomly - I've tried this about 100 times
> >>today,
> >> >> >>>>and
> >> >> >>>> >>every
> >> >> >>>> >> >time the logs are as shown in [3]. Today I also see 2
> >> >>exceptions
> >> >> >>>>in
> >> >> >>>>the
> >> >> >>>> >> >Kafka broker logs [4] but I don't know if those were
> >>occurring
> >> >> >>>> >>yesterday
> >> >> >>>> >> >when Samza was cleanly shutting down, or not.
> >> >> >>>> >> >
> >> >> >>>> >> >One thing that doesn't seem to be happening today is this
> >>log
> >> >>line
> >> >> >>>>from
> >> >> >>>> >> >BrokerProxy: "Got interrupt exception in broker proxy
> >>thread."
> >> >> >>>> >> >
> >> >> >>>> >> >Has anyone seen this, or have any advice on what to try
> >>next?
> >> >> >>>> >> >
> >> >> >>>> >> >Thanks,
> >> >> >>>> >> >Zach
> >> >> >>>> >> >
> >> >> >>>> >> >
> >> >> >>>> >> >[1]
> >> >> >>>> >>
> >> >> >>>> >>>
> >> >> >>>>
> >> >> >>>>
> >> >>
> >>http://www.mail-archive.com/[email protected]/msg02246.htm
> >> >> >>>>l
> >> >> >>>> >> >
> >> >> >>>> >> >[2]
> >> >> >>>> >> >
> >> >> >>>> >>
> >> >> >>>> >>
> >> >> >>>>
> >> >> >>>>
> >> >>
> >>https://gist.githubusercontent.com/zcox/6ec8910bd3f18e36c1a2/raw/666eae
> >> >> >>>>2
> >> >> >>>>4
> >> >> >>>> >>5
> >> >> >>>> >> >11490bf51a66e56fd90c794ea6b6282/stdout
> >> >> >>>> >> >
> >> >> >>>> >> >[3]
> >> >> >>>> >> >
> >> >> >>>> >>
> >> >> >>>> >>
> >> >> >>>>
> >> >> >>>>
> >> >>
> >>https://gist.githubusercontent.com/zcox/673a2ba607c566de7650/raw/d29ea8
> >> >> >>>>b
> >> >> >>>>9
> >> >> >>>> >>9
> >> >> >>>> >> >868da9cc4dc1db198315ee1d03bc694/gistfile1.txt
> >> >> >>>> >> >
> >> >> >>>> >> >[4]
> >> >> >>>> >> >
> >> >> >>>> >>
> >> >> >>>> >>
> >> >> >>>>
> >> >> >>>>
> >> >>
> >>https://gist.githubusercontent.com/zcox/f08de55c5d5fe2d70cde/raw/c4be45
> >> >> >>>>3
> >> >> >>>>b
> >> >> >>>> >>5
> >> >> >>>> >> >f6a57cfb5acb08508667f4b8ce8c2bd/gistfile1.txt
> >> >> >>>> >>
> >> >> >>>> >>
> >> >> >>>>
> >> >> >>>>
> >> >> >>
> >> >> >
> >> >>
> >> >>
> >>
> >>
>
>

Reply via email to