Dragos Manolescu created KAFKA-807:
--------------------------------------
Summary: LineMessageReader doesn't correctly parse the key
separator
Key: KAFKA-807
URL: https://issues.apache.org/jira/browse/KAFKA-807
Project: Kafka
Issue Type: Bug
Components: tools
Affects Versions: 0.8
Reporter: Dragos Manolescu
Priority: Trivial
Fix For: 0.8
Typo in key name prevents extracting the key separator. The patch follows;
what's the recommended way to submit patches?
Index: core/src/main/scala/kafka/producer/ConsoleProducer.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
Subsystem: com.intellij.openapi.diff.impl.patch.BaseRevisionTextPatchEP
<+>/**\n * Licensed to the Apache Software Foundation (ASF) under one or more\n
* contributor license agreements. See the NOTICE file distributed with\n *
this work for additional information regarding copyright ownership.\n * The ASF
licenses this file to You under the Apache License, Version 2.0\n * (the
\"License\"); you may not use this file except in compliance with\n * the
License. You may obtain a copy of the License at\n * \n *
http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by
applicable law or agreed to in writing, software\n * distributed under the
License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the
specific language governing permissions and\n * limitations under the
License.\n */\n\npackage kafka.producer\n\nimport
scala.collection.JavaConversions._\nimport joptsimple._\nimport
java.util.Properties\nimport java.io._\nimport kafka.common._\nimport
kafka.message._\nimport kafka.serializer._\n\nobject ConsoleProducer { \n\n
def main(args: Array[String]) { \n val parser = new OptionParser\n val
topicOpt = parser.accepts(\"topic\", \"REQUIRED: The topic id to produce
messages to.\")\n .withRequiredArg\n
.describedAs(\"topic\")\n
.ofType(classOf[String])\n val brokerListOpt =
parser.accepts(\"broker-list\", \"REQUIRED: The broker list string in the form
HOST1:PORT1,HOST2:PORT2.\")\n .withRequiredArg\n
.describedAs(\"broker-list\")\n
.ofType(classOf[String])\n val syncOpt = parser.accepts(\"sync\", \"If set
message send requests to the brokers are synchronously, one at a time as they
arrive.\")\n val compressOpt = parser.accepts(\"compress\", \"If set,
messages batches are sent compressed\")\n val batchSizeOpt =
parser.accepts(\"batch-size\", \"Number of messages to send in a single batch
if they are not being sent synchronously.\")\n
.withRequiredArg\n .describedAs(\"size\")\n
.ofType(classOf[java.lang.Integer])\n
.defaultsTo(200)\n val sendTimeoutOpt = parser.accepts(\"timeout\",
\"If set and the producer is running in asynchronous mode, this gives the
maximum amount of time\" + \n
\" a message will queue awaiting suffient batch size. The value is given in
ms.\")\n .withRequiredArg\n
.describedAs(\"timeout_ms\")\n
.ofType(classOf[java.lang.Long])\n
.defaultsTo(1000)\n val queueSizeOpt = parser.accepts(\"queue-size\", \"If
set and the producer is running in asynchronous mode, this gives the maximum
amount of \" + \n \" messages
will queue awaiting suffient batch size.\")\n
.withRequiredArg\n .describedAs(\"queue_size\")\n
.ofType(classOf[java.lang.Long])\n
.defaultsTo(10000)\n val queueEnqueueTimeoutMsOpt =
parser.accepts(\"queue-enqueuetimeout-ms\", \"Timeout for event enqueue\")\n
.withRequiredArg\n
.describedAs(\"queue enqueuetimeout ms\")\n
.ofType(classOf[java.lang.Long])\n
.defaultsTo(0)\n val requestRequiredAcksOpt =
parser.accepts(\"request-required-acks\", \"The required acks of the producer
requests\")\n .withRequiredArg\n
.describedAs(\"request required acks\")\n
.ofType(classOf[java.lang.Integer])\n
.defaultsTo(0)\n val requestTimeoutMsOpt =
parser.accepts(\"request-timeout-ms\", \"The ack timeout of the producer
requests. Value must be non-negative and non-zero\")\n
.withRequiredArg\n .describedAs(\"request
timeout ms\")\n
.ofType(classOf[java.lang.Integer])\n
.defaultsTo(1500)\n val valueEncoderOpt =
parser.accepts(\"value-serializer\", \"The class name of the message encoder
implementation to use for serializing values.\")\n
.withRequiredArg\n
.describedAs(\"encoder_class\")\n
.ofType(classOf[java.lang.String])\n
.defaultsTo(classOf[StringEncoder].getName)\n val keyEncoderOpt =
parser.accepts(\"key-serializer\", \"The class name of the message encoder
implementation to use for serializing keys.\")\n
.withRequiredArg\n
.describedAs(\"encoder_class\")\n
.ofType(classOf[java.lang.String])\n
.defaultsTo(classOf[StringEncoder].getName)\n val messageReaderOpt =
parser.accepts(\"line-reader\", \"The class name of the class to use for
reading lines from standard in. \" + \n
\"By default each line is read as a separate message.\")\n
.withRequiredArg\n
.describedAs(\"reader_class\")\n
.ofType(classOf[java.lang.String])\n
.defaultsTo(classOf[LineMessageReader].getName)\n val socketBufferSizeOpt =
parser.accepts(\"socket-buffer-size\", \"The size of the tcp RECV size.\")\n
.withRequiredArg\n
.describedAs(\"size\")\n
.ofType(classOf[java.lang.Integer])\n
.defaultsTo(1024*100)\n val propertyOpt = parser.accepts(\"property\", \"A
mechanism to pass user-defined properties in the form key=value to the message
reader. \" +\n \"This allows
custom configuration for a user-defined message reader.\")\n
.withRequiredArg\n .describedAs(\"prop\")\n
.ofType(classOf[String])\n\n\n val options =
parser.parse(args : _*)\n for(arg <- List(topicOpt, brokerListOpt)) {\n
if(!options.has(arg)) {\n System.err.println(\"Missing required argument
\\\"\" + arg + \"\\\"\")\n parser.printHelpOn(System.err)\n
System.exit(1)\n }\n }\n\n val topic = options.valueOf(topicOpt)\n
val brokerList = options.valueOf(brokerListOpt)\n val sync =
options.has(syncOpt)\n val compress = options.has(compressOpt)\n val
batchSize = options.valueOf(batchSizeOpt)\n val sendTimeout =
options.valueOf(sendTimeoutOpt)\n val queueSize =
options.valueOf(queueSizeOpt)\n val queueEnqueueTimeoutMs =
options.valueOf(queueEnqueueTimeoutMsOpt)\n val requestRequiredAcks =
options.valueOf(requestRequiredAcksOpt)\n val requestTimeoutMs =
options.valueOf(requestTimeoutMsOpt)\n val keyEncoderClass =
options.valueOf(keyEncoderOpt)\n val valueEncoderClass =
options.valueOf(valueEncoderOpt)\n val readerClass =
options.valueOf(messageReaderOpt)\n val socketBuffer =
options.valueOf(socketBufferSizeOpt)\n val cmdLineProps =
parseLineReaderArgs(options.valuesOf(propertyOpt))\n
cmdLineProps.put(\"topic\", topic)\n\n val props = new Properties()\n
props.put(\"broker.list\", brokerList)\n val codec = if(compress)
DefaultCompressionCodec.codec else NoCompressionCodec.codec\n
props.put(\"compression.codec\", codec.toString)\n
props.put(\"producer.type\", if(sync) \"sync\" else \"async\")\n
if(options.has(batchSizeOpt))\n props.put(\"batch.num.messages\",
batchSize.toString)\n props.put(\"queue.buffering.max.ms\",
sendTimeout.toString)\n props.put(\"queue.buffering.max.messages\",
queueSize.toString)\n props.put(\"queue.enqueue.timeout.ms\",
queueEnqueueTimeoutMs.toString)\n props.put(\"request.required.acks\",
requestRequiredAcks.toString)\n props.put(\"request.timeout.ms\",
requestTimeoutMs.toString)\n props.put(\"key.serializer.class\",
keyEncoderClass)\n props.put(\"serializer.class\", valueEncoderClass)\n
props.put(\"send.buffer.bytes\", socketBuffer.toString)\n val reader =
Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef,
AnyRef]]\n reader.init(System.in, cmdLineProps)\n\n try {\n val
producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))\n\n
Runtime.getRuntime.addShutdownHook(new Thread() {\n override def run()
{\n producer.close()\n }\n })\n\n var
message: KeyedMessage[AnyRef, AnyRef] = null\n do {\n message =
reader.readMessage()\n if(message != null)\n
producer.send(message)\n } while(message != null)\n } catch {\n
case e: Exception =>\n e.printStackTrace\n System.exit(1)\n
}\n System.exit(0)\n }\n\n def parseLineReaderArgs(args:
Iterable[String]): Properties = {\n val splits = args.map(_ split
\"=\").filterNot(_ == null).filterNot(_.length == 0)\n
if(!splits.forall(_.length == 2)) {\n System.err.println(\"Invalid line
reader properties: \" + args.mkString(\" \"))\n System.exit(1)\n }\n
val props = new Properties\n for(a <- splits)\n props.put(a(0), a(1))\n
props\n }\n\n trait MessageReader[K,V] { \n def init(inputStream:
InputStream, props: Properties) {}\n def readMessage(): KeyedMessage[K,V]\n
def close() {}\n }\n\n class LineMessageReader extends
MessageReader[String, String] {\n var topic: String = null\n var reader:
BufferedReader = null\n var parseKey = false\n var keySeparator =
\"\\t\"\n var ignoreError = false\n var lineNumber = 0\n\n override
def init(inputStream: InputStream, props: Properties) {\n topic =
props.getProperty(\"topic\")\n if(props.containsKey(\"parse.key\"))\n
parseKey =
props.getProperty(\"parse.key\").trim.toLowerCase.equals(\"true\")\n
if(props.containsKey(\"key.seperator\"))\n keySeparator =
props.getProperty(\"key.separator\")\n
if(props.containsKey(\"ignore.error\"))\n ignoreError =
props.getProperty(\"ignore.error\").trim.toLowerCase.equals(\"true\")\n
reader = new BufferedReader(new InputStreamReader(inputStream))\n }\n\n
override def readMessage() = {\n lineNumber += 1\n
(reader.readLine(), parseKey) match {\n case (null, _) => null\n
case (line, true) =>\n line.indexOf(keySeparator) match {\n
case -1 =>\n if(ignoreError)\n new
KeyedMessage(topic, line)\n else\n throw new
KafkaException(\"No key found on line \" + lineNumber + \": \" + line)\n
case n =>\n new KeyedMessage(topic,\n
line.substring(0, n), \n if(n +
keySeparator.size > line.size) \"\" else line.substring(n +
keySeparator.size))\n }\n case (line, false) =>\n new
KeyedMessage(topic, line)\n }\n }\n }\n}\n
===================================================================
--- core/src/main/scala/kafka/producer/ConsoleProducer.scala (revision
290d5e0eac38e9917c64353a131154821b899f26)
+++ core/src/main/scala/kafka/producer/ConsoleProducer.scala (revision )
@@ -196,7 +196,7 @@
topic = props.getProperty("topic")
if(props.containsKey("parse.key"))
parseKey =
props.getProperty("parse.key").trim.toLowerCase.equals("true")
- if(props.containsKey("key.seperator"))
+ if(props.containsKey("key.separator"))
keySeparator = props.getProperty("key.separator")
if(props.containsKey("ignore.error"))
ignoreError =
props.getProperty("ignore.error").trim.toLowerCase.equals("true")
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira