Repository: camel Updated Branches: refs/heads/camel-2.15.x 31cbc3c1c -> 9912202b4
CAMEL-8636 Committed the last batch of message when the auto commit is false Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9912202b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9912202b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9912202b Branch: refs/heads/camel-2.15.x Commit: 9912202b4e1b184dcae69d806d43842fa4af2ecd Parents: 31cbc3c Author: Willem Jiang <[email protected]> Authored: Wed Apr 15 14:39:41 2015 +0800 Committer: Willem Jiang <[email protected]> Committed: Wed Apr 15 14:39:41 2015 +0800 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaConsumer.java | 24 +++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/9912202b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index d6b49d2..46d258d 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -136,29 +136,31 @@ public class KafkaConsumer extends DefaultConsumer { boolean consumerTimeout; MessageAndMetadata<byte[], byte[]> mm = null; ConsumerIterator<byte[], byte[]> it = stream.iterator(); - - while (true) { + boolean hasNext = true; + while (hasNext) { try { consumerTimeout = false; if (it.hasNext()) { mm = it.next(); + Exchange exchange = endpoint.createKafkaExchange(mm); + try { + processor.process(exchange); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + processed++; } else { - break; - } - Exchange exchange = endpoint.createKafkaExchange(mm); - try { - processor.process(exchange); - } catch (Exception e) { - LOG.error(e.getMessage(), e); + // we don't need to process the message + hasNext = false; } - processed++; } catch (ConsumerTimeoutException e) { LOG.debug(e.getMessage(), e); consumerTimeout = true; } - if (processed >= endpoint.getBatchSize() || consumerTimeout) { + if (processed >= endpoint.getBatchSize() || consumerTimeout + || (processed > 0 && !hasNext)) { // Need to commit the offset for the last round try { berrier.await(endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS); if (!consumerTimeout) {
