Repository: kafka Updated Branches: refs/heads/0.9.0 64d512959 -> 8f23db4c6
KAFKA-3157: Mirror maker doesn't commit offset with low traffic Mirror maker doesn't commit offset with new consumer enabled when data volume is low. This is caused by infinite loop in ```receive()``` which would never jump out of loop if no data coming Author: Tao Xiao <[email protected]> Reviewers: Ismael Juma, Jason Gustafson Closes #821 from xiaotao183/KAFKA-3157 (cherry picked from commit d6e36df8737aa2b898b4fd0a81c2d94f3c349b68) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8f23db4c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8f23db4c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8f23db4c Branch: refs/heads/0.9.0 Commit: 8f23db4c6ab24ae6d0c7ecabe42cd6f2d4edb089 Parents: 64d5129 Author: Tao Xiao <[email protected]> Authored: Fri Feb 5 09:11:24 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Fri Feb 5 09:11:32 2016 -0800 ---------------------------------------------------------------------- core/src/main/scala/kafka/tools/MirrorMaker.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8f23db4c/core/src/main/scala/kafka/tools/MirrorMaker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index b093cb2..a285ac5 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -519,12 +519,14 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { consumer.subscribe(Pattern.compile(whitelistOpt.get), consumerRebalanceListener) } - // New consumer always hasNext override def hasData = true override def receive() : BaseConsumerRecord = { - while (recordIter == null || !recordIter.hasNext) + if (recordIter == null || !recordIter.hasNext) { recordIter = consumer.poll(1000).iterator + if (!recordIter.hasNext) + throw new ConsumerTimeoutException + } val record = recordIter.next() val tp = new TopicPartition(record.topic, record.partition)
