Repository: kafka Updated Branches: refs/heads/trunk a9687bc0d -> 908b6d114
KAFKA-4521; MirrorMaker should flush all messages before releasing partition ownership during rebalance Author: Dong Lin <lindon...@gmail.com> Author: Dong Lin <do...@linkedin.com> Reviewers: Jiangjie Qin <becket....@gmail.com> Closes #2241 from lindong28/KAFKA-4521 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/908b6d11 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/908b6d11 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/908b6d11 Branch: refs/heads/trunk Commit: 908b6d1148df963d21a70aaa73a7a87571b965a9 Parents: a9687bc Author: Dong Lin <lindon...@gmail.com> Authored: Thu Dec 15 13:23:01 2016 -0800 Committer: Jiangjie Qin <becket....@gmail.com> Committed: Thu Dec 15 13:23:01 2016 -0800 ---------------------------------------------------------------------- .../main/scala/kafka/tools/MirrorMaker.scala | 50 ++++++++++++++++++-- 1 file changed, 46 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/908b6d11/core/src/main/scala/kafka/tools/MirrorMaker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 2cfcb95..19a2570 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -470,11 +470,14 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } def maybeFlushAndCommitOffsets() { - if (System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) { - debug("Committing MirrorMaker state automatically.") + val commitRequested = mirrorMakerConsumer.commitRequested() + if (commitRequested || System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) { + debug("Committing MirrorMaker state.") producer.flush() commitOffsets(mirrorMakerConsumer) lastOffsetCommitMs = System.currentTimeMillis() + if (commitRequested) + mirrorMakerConsumer.notifyCommit() } } @@ -503,12 +506,16 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { private[kafka] trait MirrorMakerBaseConsumer extends BaseConsumer { def init() + def commitRequested(): Boolean + def notifyCommit() + def requestAndWaitForCommit() def hasData : Boolean } private class MirrorMakerOldConsumer(connector: ZookeeperConsumerConnector, filterSpec: TopicFilter) extends MirrorMakerBaseConsumer { private var iter: ConsumerIterator[Array[Byte], Array[Byte]] = null + private var immediateCommitRequested: Boolean = false override def init() { // Creating one stream per each connector instance @@ -518,6 +525,29 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { iter = stream.iterator() } + override def requestAndWaitForCommit() { + this.synchronized { + // skip wait() if mirrorMakerConsumer has not been initialized + if (iter != null) { + immediateCommitRequested = true + this.wait() + } + } + } + + override def notifyCommit() { + this.synchronized { + immediateCommitRequested = false + this.notifyAll() + } + } + + override def commitRequested(): Boolean = { + this.synchronized { + immediateCommitRequested + } + } + override def hasData = iter.hasNext() override def receive() : BaseConsumerRecord = { @@ -570,6 +600,18 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } + override def requestAndWaitForCommit() { + // Do nothing + } + + override def notifyCommit() { + // Do nothing + } + + override def commitRequested(): Boolean = { + false + } + override def hasData = true override def receive() : BaseConsumerRecord = { @@ -632,8 +674,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { extends ConsumerRebalanceListener { override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) { - producer.flush() - commitOffsets(mirrorMakerConsumer) + // The zookeeper listener thread, which executes this method, needs to wait for MirrorMakerThread to flush data and commit offset + mirrorMakerConsumer.requestAndWaitForCommit() // invoke custom consumer rebalance listener customRebalanceListenerForOldConsumer.foreach(_.beforeReleasingPartitions(partitionOwnership)) }