Thanks for checking Damian. The underlying problem of KAFKA-2978 is that the fetched position was getting out of sync with the consumed position (and fetches were only sent if the fetched position matched the consumed position). The reporter was seeing this problem after rebalancing, but perhaps this could also happen with manual partition assignment. Jason may be able to confirm.
Ismael On Sat, Feb 13, 2016 at 8:43 PM, Damian Guy <damian....@gmail.com> wrote: > I tried the 0.9.0.1 client and was able to successfully consume the > partition without any problems. > > Thanks, > Damian > > On 13 February 2016 at 14:49, Damian Guy <damian....@gmail.com> wrote: > > > Forgot to reply-all… > > > > > > On 13 Feb 2016, at 14:49, Damian Guy <damian....@gmail.com> wrote: > > > > Hi Ismael, > > > > In this case it wasn’t a rebalance as I was using consumer.assign(..) > > I’ll give it a try with the 0.9.0.1 client to see if i can reproduce. > > I’ve tried the 0.9.0.1 client with consumer groups and it seemed to work > > fine even when constantly rebalancing. Thanks for the tip! > > > > Thanks, > > Damian > > > > On 13 Feb 2016, at 13:16, Ismael Juma <ism...@juma.me.uk> wrote: > > > > Hi Damian, > > > > KAFKA-2978, which would cause consumption to stop would happen after a > > consumer group rebalance. Was this the case for you? > > > > It would be great if you could upgrade the client to 0.9.0.1 RC1 in order > > to check if the problem still happens. There were other bugs fixed in the > > 0.9.0 branch and it simplifies the analysis if we can rule them out. See > > the following for the full list: > > > > https://github.com/apache/kafka/compare/0.9.0.0...0.9.0.1 > > > > Thanks, > > Ismael > > > > On Sat, Feb 13, 2016 at 9:43 AM, Damian Guy <damian....@gmail.com> > wrote: > > > >> I've been having some issues with the New Consumer. I'm aware there is a > >> bug that has been fixed for 0.9.0.1, but is this the same thing? > >> I'm using manual partition assignment due to latency issues making it > near > >> impossible to work with the group management features. > >> > >> So, my consumer was going along fine for most of the day - it just > >> consumes > >> from a topic with a single partition. However it has just stopped > >> receiving > >> messages and I can see there is a backlog of around 100k messages to get > >> through. Since message consumption has stopped i get the below "Marking > >> the > >> coordinator dead" log messages every 9 minutes. I have done multiple > stack > >> dumps to see what is happening, one of which is below, and it is always > >> appears to be in the consumer.poll > >> > >> So.. same bug as the one i believe is fixed on 0.9.0.1? In which case > i'll > >> upgrade my client to the latest from the branch. Or is this something > >> different? > >> > >> Thanks, > >> Damian > >> > >> 2016/02/13 00:07:57 131.73 MB/1.8 GB INFO > >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - > >> Marking the coordinator 2147479630 dead. > >> 2016/02/13 00:16:57 151.75 MB/1.79 GB INFO > >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - > >> Marking the coordinator 2147479630 dead. > >> 2016/02/13 00:25:57 181.07 MB/1.76 GB INFO > >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - > >> Marking the coordinator 2147479630 dead. > >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - > >> Marking the coordinator 2147479630 dead. > >> > >> "poll-kafka-1" #45 prio=5 os_prio=0 tid=0x00007f7dba9da800 nid=0x52fd > >> runnable [0x00007f7cecbe3000] > >> java.lang.Thread.State: RUNNABLE > >> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > >> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) > >> at > >> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) > >> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) > >> - locked <0x00000000ac6df4c8> (a sun.nio.ch.Util$2) > >> - locked <0x00000000ac6df4b0> (a > >> java.util.Collections$UnmodifiableSet) > >> - locked <0x00000000ac53db20> (a sun.nio.ch.EPollSelectorImpl) > >> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > >> at > >> org.apache.kafka.common.network.Selector.select(Selector.java:425) > >> at > >> org.apache.kafka.common.network.Selector.poll(Selector.java:254) > >> at > >> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270) > >> at > >> > >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303) > >> at > >> > >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197) > >> at > >> > >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187) > >> at > >> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:877) > >> at > >> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:829) > >> > > > > > > > > >