[ 
https://issues.apache.org/jira/browse/KAFKA-3491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15222102#comment-15222102
 ] 

Jason Gustafson commented on KAFKA-3491:
----------------------------------------

Good catch. The best workaround at the moment would probably be to call 
unsubscribe() prior to closing in the exception handler:

{code}
 public void run() {
    try {
      consumer.subscribe(topics);

      while (true) {
        ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
        records.forEach(record -> process(record));
      }
    } catch (WakeupException e) {
      // ignore, we're closing
    } catch (Exception e) {
      log.error("Unexpected error", e);
      consumer.unsubscribe();
    } finally {
      consumer.close();
    }
  }
{code}

This will cause the consumer to abandon its current positions, which prevents 
automatic commit in close(). I think we can either update the documentation to 
use that pattern or we can try to fix the underlying behavior. One option would 
be to turn off the commit on close() behavior entirely. This would be 
unfortunate since we'd have duplicates on virtually every close(), but it 
doesn't require any change to the API and we only try to provide "at least 
once" delivery anyway. In that case, we may as well start officially 
discouraging automatic commit in the documentation. We could also add a new 
close() method with a flag to indicate whether it is clean or not, but I think 
this would be even uglier than the unsubscribe pattern above. 

[~guozhang] [~ewencp] Thoughts?

> Issue with consumer close() in finally block with 'enable.auto.commit=true'
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-3491
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3491
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.9.0.0, 0.9.0.1
>            Reporter: dan norwood
>            Assignee: Jason Gustafson
>            Priority: Minor
>
> imagine you have a run loop that looks like the following:
> {code:java}
>   public void run() {
>     try {
>       consumer.subscribe(topics);
>       while (true) {
>         ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
>         records.forEach(record -> process(record));
>       }
>     } catch (WakeupException e) {
>       // ignore, we're closing
>     } catch (Exception e) {
>       log.error("Unexpected error", e);
>     } finally {
>       consumer.close();
>     }
>   }
> {code}
> if you run this with 'enable.auto.commit=true' and throw an exception in the 
> 'process()' method you will still try to commit all the read, but 
> unprocessed, offsets in the most recent batch.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to