Updated Branches: refs/heads/0.8 46ebdc16e -> a376f9221
Support deep iteration in DumpLogSegments tool; patched by Jun Rao; reviewed by Neha Narkhede; kafka-812 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a376f922 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a376f922 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a376f922 Branch: refs/heads/0.8 Commit: a376f922149330995f91d427d76ff1595fbd26ce Parents: 46ebdc1 Author: Jun Rao <jun...@gmail.com> Authored: Tue Mar 19 16:32:59 2013 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Tue Mar 19 16:32:59 2013 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/tools/DumpLogSegments.scala | 98 +++++++++++---- 1 files changed, 71 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a376f922/core/src/main/scala/kafka/tools/DumpLogSegments.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 1bed554..06e6437 100644 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -35,6 +35,12 @@ object DumpLogSegments { .withRequiredArg .describedAs("file1, file2, ...") .ofType(classOf[String]) + val maxMessageSizeOpt = parser.accepts("max-message-size", "Size of largest message.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(5 * 1024 * 1024) + val deepIterationOpt = parser.accepts("deep-iteration", "if set, uses deep instead of shallow iteration") val options = parser.parse(args : _*) if(!options.has(filesOpt)) { @@ -46,6 +52,8 @@ object DumpLogSegments { val print = if(options.has(printOpt)) true else false val verifyOnly = if(options.has(verifyOpt)) true else false val files = options.valueOf(filesOpt).split(",") + val maxMessageSize = options.valueOf(maxMessageSizeOpt).intValue() + val isDeepIteration = if(options.has(deepIterationOpt)) true else false val misMatchesForIndexFilesMap = new mutable.HashMap[String, List[(Long, Long)]] val nonConsecutivePairsForLogFilesMap = new mutable.HashMap[String, List[(Long, Long)]] @@ -54,17 +62,17 @@ object DumpLogSegments { val file = new File(arg) if(file.getName.endsWith(Log.LogFileSuffix)) { println("Dumping " + file) - dumpLog(file, print, nonConsecutivePairsForLogFilesMap) + dumpLog(file, print, nonConsecutivePairsForLogFilesMap, isDeepIteration) } else if(file.getName.endsWith(Log.IndexFileSuffix)) { println("Dumping " + file) - dumpIndex(file, verifyOnly, misMatchesForIndexFilesMap) + dumpIndex(file, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize) } } misMatchesForIndexFilesMap.foreach { case (fileName, listOfMismatches) => { System.err.println("Mismatches in :" + fileName) listOfMismatches.foreach(m => { - System.err.println(" Index position: %d, log position: %d".format(m._1, m._2)) + System.err.println(" Index offset: %d, log offset: %d".format(m._1, m._2)) }) } } @@ -79,7 +87,10 @@ object DumpLogSegments { } /* print out the contents of the index */ - private def dumpIndex(file: File, verifyOnly: Boolean, misMatchesForIndexFilesMap: mutable.HashMap[String, List[(Long, Long)]]) { + private def dumpIndex(file: File, + verifyOnly: Boolean, + misMatchesForIndexFilesMap: mutable.HashMap[String, List[(Long, Long)]], + maxMessageSize: Int) { val startOffset = file.getName().split("\\.")(0).toLong val logFileName = file.getAbsolutePath.split("\\.")(0) + Log.LogFileSuffix val logFile = new File(logFileName) @@ -87,8 +98,8 @@ object DumpLogSegments { val index = new OffsetIndex(file = file, baseOffset = startOffset) for(i <- 0 until index.entries) { val entry = index.entry(i) - val partialFileMessageSet: FileMessageSet = messageSet.read(entry.position, messageSet.sizeInBytes()) - val messageAndOffset = partialFileMessageSet.head + val partialFileMessageSet: FileMessageSet = messageSet.read(entry.position, maxMessageSize) + val messageAndOffset = getIterator(partialFileMessageSet.head, isDeepIteration = true).next() if(messageAndOffset.offset != entry.offset + index.baseOffset) { var misMatchesSeq = misMatchesForIndexFilesMap.getOrElse(file.getName, List[(Long, Long)]()) misMatchesSeq ::=(entry.offset + index.baseOffset, messageAndOffset.offset) @@ -103,40 +114,73 @@ object DumpLogSegments { } /* print out the contents of the log */ - private def dumpLog(file: File, printContents: Boolean, nonConsecutivePairsForLogFilesMap: mutable.HashMap[String, List[(Long, Long)]]) { + private def dumpLog(file: File, + printContents: Boolean, + nonConsecutivePairsForLogFilesMap: mutable.HashMap[String, List[(Long, Long)]], + isDeepIteration: Boolean) { val startOffset = file.getName().split("\\.")(0).toLong println("Starting offset: " + startOffset) val messageSet = new FileMessageSet(file) var validBytes = 0L var lastOffset = -1l - for(messageAndOffset <- messageSet) { - val msg = messageAndOffset.message + for(shallowMessageAndOffset <- messageSet) { // this only does shallow iteration + val itr = getIterator(shallowMessageAndOffset, isDeepIteration) + for (messageAndOffset <- itr) { + val msg = messageAndOffset.message - if(lastOffset == -1) + if(lastOffset == -1) + lastOffset = messageAndOffset.offset + // If we are iterating uncompressed messages, offsets must be consecutive + else if (msg.compressionCodec == NoCompressionCodec && messageAndOffset.offset != lastOffset +1) { + var nonConsecutivePairsSeq = nonConsecutivePairsForLogFilesMap.getOrElse(file.getName, List[(Long, Long)]()) + nonConsecutivePairsSeq ::=(lastOffset, messageAndOffset.offset) + nonConsecutivePairsForLogFilesMap.put(file.getName, nonConsecutivePairsSeq) + } lastOffset = messageAndOffset.offset - // If it's uncompressed message, its offset must be lastOffset + 1 no matter last message is compressed or uncompressed - else if (msg.compressionCodec == NoCompressionCodec && messageAndOffset.offset != lastOffset +1) { - var nonConsecutivePairsSeq = nonConsecutivePairsForLogFilesMap.getOrElse(file.getName, List[(Long, Long)]()) - nonConsecutivePairsSeq ::=(lastOffset, messageAndOffset.offset) - nonConsecutivePairsForLogFilesMap.put(file.getName, nonConsecutivePairsSeq) - } - lastOffset = messageAndOffset.offset - print("offset: " + messageAndOffset.offset + " position: " + validBytes + " isvalid: " + msg.isValid + - " payloadsize: " + msg.payloadSize + " magic: " + msg.magic + - " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum) - validBytes += MessageSet.entrySize(msg) - if(msg.hasKey) - print(" keysize: " + msg.keySize) - if(printContents) { + print("offset: " + messageAndOffset.offset + " position: " + validBytes + " isvalid: " + msg.isValid + + " payloadsize: " + msg.payloadSize + " magic: " + msg.magic + + " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum) if(msg.hasKey) - print(" key: " + Utils.readString(messageAndOffset.message.payload, "UTF-8")) - print(" payload: " + Utils.readString(messageAndOffset.message.payload, "UTF-8")) + print(" keysize: " + msg.keySize) + if(printContents) { + if(msg.hasKey) + print(" key: " + Utils.readString(messageAndOffset.message.payload, "UTF-8")) + print(" payload: " + Utils.readString(messageAndOffset.message.payload, "UTF-8")) + } + println() } - println() + validBytes += MessageSet.entrySize(shallowMessageAndOffset.message) } val trailingBytes = messageSet.sizeInBytes - validBytes if(trailingBytes > 0) println("Found %d invalid bytes at the end of %s".format(trailingBytes, file.getName)) } + + private def getIterator(messageAndOffset: MessageAndOffset, isDeepIteration: Boolean) = { + if (isDeepIteration) { + val message = messageAndOffset.message + message.compressionCodec match { + case NoCompressionCodec => + getSingleMessageIterator(messageAndOffset) + case _ => + ByteBufferMessageSet.decompress(message).iterator + } + } else + getSingleMessageIterator(messageAndOffset) + } + + private def getSingleMessageIterator(messageAndOffset: MessageAndOffset) = { + new IteratorTemplate[MessageAndOffset] { + var messageIterated = false + + override def makeNext(): MessageAndOffset = { + if (!messageIterated) { + messageIterated = true + messageAndOffset + } else + allDone() + } + } + } }