[ 
https://issues.apache.org/jira/browse/KAFKA-3146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15115708#comment-15115708
 ] 

Jason Gustafson commented on KAFKA-3146:
----------------------------------------

[~darkjh] You need to commit offsets regularly for the progress to be updated. 
In the gist above, you only commit when the consumer is shutdown or rebalanced. 
I changed the poll loop to the following and ran several times on the 0.9.0 
branch without seeing any problems:

{code}
def run(): Unit = {
    while (!Thread.currentThread().isInterrupted) {
      try {
        val records = kafkaConsumer.poll(100)
        println(s"$name pulled ${records.size} msgs ...")
        Thread.sleep(50)
        for (msg <- records) {
          count += 1
        }
        commitOffset()
      } catch {
        case e: Exception =>
          println("Error during processing of the message: " + e)
      }
    }
  }
{code}

Note the call to commitSync() on every iteration of the poll loop. Can you try 
this change against the 0.9.0 branch and see if you still experience the issue?

Couple more notes from the gist: the commitSync call in the shutdown hook is 
incorrect. The consumer is not safe for concurrent access. A better way to 
handle it is to use wakeup() to interrupt the poll loop, and then call 
commitSync() before closing the consumer (which you should also make sure to 
do). You also need to make sure that you give the consumer enough time to 
actually commit the offsets prior to shutting down. Since you're using an 
executor, the easiest way to do this is to call shutdown() followed by 
awaitTermination(). Here's a gist which shows what I mean: 
https://gist.github.com/hachikuji/35023fe424438253ada3.

> Stuck consumer with new consumer API in 0.9
> -------------------------------------------
>
>                 Key: KAFKA-3146
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3146
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.9.0.0
>            Reporter: Han JU
>            Assignee: Jason Gustafson
>
> I'm prototyping with the new consumer API of kafka 0.9 and I'm particularly 
> interested in the `ConsumerRebalanceListener`.
> My test setup is like the following:
>   - 5M messages pre-loaded in one node kafka 0.9
>   - 12 partitions, auto offset commit set to false
>   - in `onPartitionsRevoked`, commit offset and flush the local state
> The test run is like the following:
>   - launch one process with 2 consumers and let it consume for a while
>   - launch another process with 2 consumers, this triggers a rebalancing, and 
> let these 2 processes run until messages are all consumed
> The code is here: https://gist.github.com/darkjh/fe1e5a5387bf13b4d4dd
> So at first, the 2 consumers of the first process each got 6 partitions. And 
> after the rebalancing, each consumer got 3 partitions. It's confirmed by 
> logging inside the `onPartitionAssigned` callback.
> But after the rebalancing, one of the 2 consumers of the first process stop 
> receiving messages, even if it has partitions assigned to: 
> balance-1 pulled 7237 msgs ...
> balance-0 pulled 7263 msgs ...
> 2016-01-22 15:50:37,533 [INFO] [pool-1-thread-2] 
> o.a.k.c.c.i.AbstractCoordinator - Attempt to heart beat failed since the 
> group is rebalancing, try to re-join group.
> balance-1 flush @ 536637
> balance-1 committed offset for List(balance-11, balance-10, balance-9, 
> balance-8, balance-7, balance-6)
> 2016-01-22 15:50:37,575 [INFO] [pool-1-thread-1] 
> o.a.k.c.c.i.AbstractCoordinator - Attempt to heart beat failed since the 
> group is rebalancing, try to re-join group.
> balance-0 flush @ 543845
> balance-0 committed offset for List(balance-5, balance-4, balance-3, 
> balance-2, balance-1, balance-0)
> balance-0 got assigned List(balance-5, balance-4, balance-3)
> balance-1 got assigned List(balance-11, balance-10, balance-9)
> balance-1 pulled 3625 msgs ...
> balance-0 pulled 3621 msgs ...
> balance-0 pulled 3631 msgs ...
> balance-0 pulled 3631 msgs ...
> balance-1 pulled 0 msgs ...
> balance-0 pulled 3643 msgs ...
> balance-0 pulled 3643 msgs ...
> balance-1 pulled 0 msgs ...
> balance-0 pulled 3622 msgs ...
> balance-0 pulled 3632 msgs ...
> balance-1 pulled 0 msgs ...
> balance-0 pulled 3637 msgs ...
> balance-0 pulled 3641 msgs ...
> balance-0 pulled 3640 msgs ...
> balance-1 pulled 0 msgs ...
> balance-0 pulled 3632 msgs ...
> balance-0 pulled 3630 msgs ...
> balance-1 pulled 0 msgs ...
> ......
> `balance-0` and `balance-1` are the names of the consumer thread. So after 
> the rebalancing, thread `balance-1` continues to poll but no message arrive, 
> given that it has got 3 partitions assigned to after the rebalancing.
> Finally other 3 consumers pulls all their partitions' message, the situation 
> is like 
> GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
> balance-test, balance, 9, 417467, 417467, 0, consumer-2_/127.0.0.1
> balance-test, balance, 10, 417467, 417467, 0, consumer-2_/127.0.0.1
> balance-test, balance, 11, 417467, 417467, 0, consumer-2_/127.0.0.1
> balance-test, balance, 6, 180269, 417467, 237198, consumer-2_/127.0.0.1
> balance-test, balance, 7, 180036, 417468, 237432, consumer-2_/127.0.0.1
> balance-test, balance, 8, 180197, 417467, 237270, consumer-2_/127.0.0.1
> balance-test, balance, 3, 417467, 417467, 0, consumer-1_/127.0.0.1
> balance-test, balance, 4, 417468, 417468, 0, consumer-1_/127.0.0.1
> balance-test, balance, 5, 417468, 417468, 0, consumer-1_/127.0.0.1
> balance-test, balance, 0, 417467, 417467, 0, consumer-1_/127.0.0.1
> balance-test, balance, 1, 417467, 417467, 0, consumer-1_/127.0.0.1
> balance-test, balance, 2, 417467, 417467, 0, consumer-1_/127.0.0.1 
> So you can see, partition [6, 7, 8] still has messages, but the consumer 
> can't pull them after the rebalancing. 
> I've tried 0.9.0.0 release, trunk and 0.9.0 branch, for both server/broker 
> and client.
> One workaround (by Bruno Rassaerts), is to do a manual seek to the current 
> position in the `onPartitionsAssigned` call back.
> The corresponding mailing list discussion is here: 
> http://mail-archives.apache.org/mod_mbox/kafka-users/201601.mbox/%3CCA%2BndhHok%3DemRceLuhwGHKwMCVQSmgTUeaxs-ycK-U2nLcc8Uhg%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to