Updated Branches: refs/heads/0.8 c02e7fd69 -> 53818bb7e
ConsoleProducer does not exit correctly; kafka-701; patched by Maxime Brugidou; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/53818bb7 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/53818bb7 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/53818bb7 Branch: refs/heads/0.8 Commit: 53818bb7ee8022486eee06a22328200fc5cfced1 Parents: c02e7fd Author: Jun Rao <jun...@gmail.com> Authored: Mon Jan 14 09:21:10 2013 -0800 Committer: Jun Rao <jun...@gmail.com> Committed: Mon Jan 14 09:21:10 2013 -0800 ---------------------------------------------------------------------- config/consumer.properties | 2 +- config/server.properties | 2 +- .../scala/kafka/producer/ConsoleProducer.scala | 34 +++++++++------ 3 files changed, 22 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/53818bb7/config/consumer.properties ---------------------------------------------------------------------- diff --git a/config/consumer.properties b/config/consumer.properties index 1c43bf9..9dbd583 100644 --- a/config/consumer.properties +++ b/config/consumer.properties @@ -20,7 +20,7 @@ zk.connect=127.0.0.1:2181 # timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 #consumer group id group.id=test-consumer-group http://git-wip-us.apache.org/repos/asf/kafka/blob/53818bb7/config/server.properties ---------------------------------------------------------------------- diff --git a/config/server.properties b/config/server.properties index 9a9cd06..04408dd 100644 --- a/config/server.properties +++ b/config/server.properties @@ -105,7 +105,7 @@ log.cleanup.interval.mins=1 zk.connect=localhost:2181 # Timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 # metrics reporter properties kafka.metrics.polling.interval.secs=5 http://git-wip-us.apache.org/repos/asf/kafka/blob/53818bb7/core/src/main/scala/kafka/producer/ConsoleProducer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala index 1a98174..8b77465 100644 --- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala @@ -128,7 +128,7 @@ object ConsoleProducer { props.put("batch.num.messages", batchSize.toString) props.put("queue.buffering.max.ms", sendTimeout.toString) props.put("queue.buffering.max.messages", queueSize.toString) - props.put("queue.enqueueTimeout.ms", queueEnqueueTimeoutMs.toString) + props.put("queue.enqueue.timeout.ms", queueEnqueueTimeoutMs.toString) props.put("request.required.acks", requestRequiredAcks.toString) props.put("request.timeout.ms", requestTimeoutMs.toString) props.put("key.serializer.class", keyEncoderClass) @@ -137,20 +137,26 @@ object ConsoleProducer { val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]] reader.init(System.in, cmdLineProps) - val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props)) + try { + val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props)) - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run() { - producer.close() - } - }) - - var message: KeyedMessage[AnyRef, AnyRef] = null - do { - message = reader.readMessage() - if(message != null) - producer.send(message) - } while(message != null) + Runtime.getRuntime.addShutdownHook(new Thread() { + override def run() { + producer.close() + } + }) + + var message: KeyedMessage[AnyRef, AnyRef] = null + do { + message = reader.readMessage() + if(message != null) + producer.send(message) + } while(message != null) + } catch { + case e: Exception => + e.printStackTrace + System.exit(1) + } System.exit(0) }