KAFKA-842 Mirror maker can lose some messages during shutdown; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bd262ac7 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bd262ac7 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bd262ac7 Branch: refs/heads/trunk Commit: bd262ac708062e502406e8d775f4c9432a5364e7 Parents: 3c27988 Author: Neha Narkhede <[email protected]> Authored: Wed Apr 3 13:43:50 2013 -0700 Committer: Neha Narkhede <[email protected]> Committed: Wed Apr 3 13:43:50 2013 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/tools/KafkaMigrationTool.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/bd262ac7/core/src/main/scala/kafka/tools/KafkaMigrationTool.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java index a15b350..3c18286 100644 --- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java +++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java @@ -385,8 +385,10 @@ public class KafkaMigrationTool { try{ while(true) { KeyedMessage<byte[], byte[]> data = producerDataChannel.receiveRequest(); - if(!data.equals(shutdownMessage)) + if(!data.equals(shutdownMessage)) { producer.send(data); + if(logger.isDebugEnabled()) logger.debug("Sending message %s".format(new String(data.message()))); + } else break; } @@ -410,6 +412,7 @@ public class KafkaMigrationTool { public void awaitShutdown() { try { shutdownComplete.await(); + producer.close(); logger.info("Producer thread " + threadName + " shutdown complete"); } catch(InterruptedException ie) { logger.warn("Interrupt during shutdown of ProducerThread", ie);
