Repository: kafka Updated Branches: refs/heads/trunk feab5a374 -> d9f052acc
KAFKA-3501: Console consumer process hangs on exit - replace `System.exit(1)` with a regular `return` in order to release the latch blocking the shutdown hook thread from shutting down the JVM - provide `PrintStream` to the `process` method in order to ease unit testing Author: Sebastien Launay <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #1185 from slaunay/bugfix/KAFKA-3501-console-consumer-hangs Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d9f052ac Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d9f052ac Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d9f052ac Branch: refs/heads/trunk Commit: d9f052acc396871801edee13ef0a6042a9af6626 Parents: feab5a3 Author: Sebastien Launay <[email protected]> Authored: Mon Jun 6 21:34:33 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Mon Jun 6 21:34:33 2016 -0700 ---------------------------------------------------------------------- .../scala/kafka/tools/ConsoleConsumer.scala | 20 ++++++++----- .../unit/kafka/tools/ConsoleConsumerTest.scala | 31 ++++++++++++++++++-- 2 files changed, 41 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d9f052ac/core/src/main/scala/kafka/tools/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 8953640..3b7a214 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -70,9 +70,10 @@ object ConsoleConsumer extends Logging { addShutdownHook(consumer, conf) try { - process(conf.maxMessages, conf.formatter, consumer, conf.skipMessageOnError) + process(conf.maxMessages, conf.formatter, consumer, System.out, conf.skipMessageOnError) } finally { consumer.cleanup() + conf.formatter.close() reportRecordCount() // if we generated a random group id (as none specified explicitly) then avoid polluting zookeeper with persistent group data, this is a hack @@ -111,7 +112,7 @@ object ConsoleConsumer extends Logging { }) } - def process(maxMessages: Integer, formatter: MessageFormatter, consumer: BaseConsumer, skipMessageOnError: Boolean) { + def process(maxMessages: Integer, formatter: MessageFormatter, consumer: BaseConsumer, output: PrintStream, skipMessageOnError: Boolean) { while (messageCount < maxMessages || maxMessages == -1) { val msg: BaseConsumerRecord = try { consumer.receive() @@ -132,7 +133,7 @@ object ConsoleConsumer extends Logging { messageCount += 1 try { formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp, - msg.timestampType, 0, 0, 0, msg.key, msg.value), System.out) + msg.timestampType, 0, 0, 0, msg.key, msg.value), output) } catch { case e: Throwable => if (skipMessageOnError) { @@ -142,7 +143,10 @@ object ConsoleConsumer extends Logging { throw e } } - checkErr(formatter) + if (checkErr(output, formatter)) { + // Consumer will be closed + return + } } } @@ -150,13 +154,13 @@ object ConsoleConsumer extends Logging { System.err.println(s"Processed a total of $messageCount messages") } - def checkErr(formatter: MessageFormatter) { - if (System.out.checkError()) { + def checkErr(output: PrintStream, formatter: MessageFormatter): Boolean = { + val gotError = output.checkError() + if (gotError) { // This means no one is listening to our output stream any more, time to shutdown System.err.println("Unable to write to standard out, closing consumer.") - formatter.close() - System.exit(1) } + gotError } def getOldConsumerProps(config: ConsumerConfig): Properties = { http://git-wip-us.apache.org/repos/asf/kafka/blob/d9f052ac/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index 31b3211..c3ebade 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -17,7 +17,7 @@ package kafka.tools -import java.io.FileOutputStream +import java.io.{PrintStream, FileOutputStream} import kafka.common.MessageFormatter import kafka.consumer.{BaseConsumer, BaseConsumerRecord} @@ -47,7 +47,34 @@ class ConsoleConsumerTest extends JUnitSuite { EasyMock.replay(formatter) //Test - ConsoleConsumer.process(messageLimit, formatter, consumer, true) + ConsoleConsumer.process(messageLimit, formatter, consumer, System.out, true) + } + + @Test + def shouldStopWhenOutputCheckErrorFails() { + //Mocks + val consumer = EasyMock.createNiceMock(classOf[BaseConsumer]) + val formatter = EasyMock.createNiceMock(classOf[MessageFormatter]) + val printStream = EasyMock.createNiceMock(classOf[PrintStream]) + + //Stubs + val record = new BaseConsumerRecord(topic = "foo", partition = 1, offset = 1, key = Array[Byte](), value = Array[Byte]()) + + //Expectations + EasyMock.expect(consumer.receive()).andReturn(record) + EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), EasyMock.eq(printStream))) + //Simulate an error on System.out after the first record has been printed + EasyMock.expect(printStream.checkError()).andReturn(true) + + EasyMock.replay(consumer) + EasyMock.replay(formatter) + EasyMock.replay(printStream) + + //Test + ConsoleConsumer.process(-1, formatter, consumer, printStream, true) + + //Verify + EasyMock.verify(consumer, formatter, printStream) } @Test
