This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new e9416ee MINOR: Remove redundant fields in dump log record output
(#11101)
e9416ee is described below
commit e9416eed2d66143b3d8e0394a1f1859a2b962280
Author: Jason Gustafson <[email protected]>
AuthorDate: Fri Jul 23 15:56:41 2021 -0700
MINOR: Remove redundant fields in dump log record output (#11101)
In 2.8, the dump log output regressed to print batch level information for
each record, which makes the output much noisier. This patch changes the output
to what it was in 2.7 and previous versions. We only print batch metadata at
the batch level.
Reviewers: David Arthur <[email protected]>, Ismael Juma <[email protected]>
---
.../main/scala/kafka/tools/DumpLogSegments.scala | 11 +-
.../unit/kafka/tools/DumpLogSegmentsTest.scala | 145 ++++++++++++++++++++-
2 files changed, 146 insertions(+), 10 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index c495dbc..e097206 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -268,13 +268,10 @@ object DumpLogSegments {
}
lastOffset = record.offset
- var prefix = s"${RecordIndent} "
+ var prefix = s"$RecordIndent "
if (!skipRecordMetadata) {
- print(s"${prefix}offset: ${record.offset}" +
- s" keySize: ${record.keySize} valueSize: ${record.valueSize}
${batch.timestampType}: ${record.timestamp}" +
- s" baseOffset: ${batch.baseOffset} lastOffset:
${batch.lastOffset} baseSequence: ${batch.baseSequence}" +
- s" lastSequence: ${batch.lastSequence} producerEpoch:
${batch.producerEpoch} partitionLeaderEpoch: ${batch.partitionLeaderEpoch}" +
- s" batchSize: ${batch.sizeInBytes} magic: ${batch.magic}
compressType: ${batch.compressionType} position: ${validBytes}")
+ print(s"${prefix}offset: ${record.offset}
${batch.timestampType}: ${record.timestamp} " +
+ s"keySize: ${record.keySize} valueSize: ${record.valueSize}")
prefix = " "
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
@@ -327,7 +324,7 @@ object DumpLogSegments {
println(" position: " + accumulativeBytes + " " + batch.timestampType + ":
" + batch.maxTimestamp +
" size: " + batch.sizeInBytes + " magic: " + batch.magic +
- " compresscodec: " + batch.compressionType + " crc: " + batch.checksum +
" isvalid: " + batch.isValid)
+ " compresscodec: " + batch.compressionType.name + " crc: " +
batch.checksum + " isvalid: " + batch.isValid)
}
class TimeIndexDumpErrors {
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index bcfe754..ea3304d 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -21,20 +21,22 @@ import java.io.{ByteArrayOutputStream, File, PrintWriter}
import java.nio.ByteBuffer
import java.util
import java.util.Properties
-import kafka.log.{Log, LogConfig, LogManager, LogTestUtils}
-import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
+
+import kafka.log.{AppendOrigin, Log, LogConfig, LogManager, LogTestUtils}
+import kafka.server.{BrokerTopicStats, FetchLogEnd, LogDirFailureChannel}
import kafka.tools.DumpLogSegments.TimeIndexDumpErrors
import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.metadata.{PartitionChangeRecord,
RegisterBrokerRecord, TopicRecord}
import org.apache.kafka.common.protocol.{ByteBufferAccessor,
ObjectSerializationCache}
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords,
SimpleRecord}
+import org.apache.kafka.common.record.{CompressionType, ControlRecordType,
EndTransactionMarker, MemoryRecords, RecordVersion, SimpleRecord}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -89,6 +91,40 @@ class DumpLogSegmentsTest {
}
@Test
+ def testBatchAndRecordMetadataOutput(): Unit = {
+ log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 0,
+ new SimpleRecord("a".getBytes),
+ new SimpleRecord("b".getBytes)
+ ), leaderEpoch = 0)
+
+ log.appendAsLeader(MemoryRecords.withRecords(CompressionType.GZIP, 0,
+ new SimpleRecord(time.milliseconds(), "c".getBytes, "1".getBytes),
+ new SimpleRecord("d".getBytes)
+ ), leaderEpoch = 3)
+
+ log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 0,
+ new SimpleRecord("e".getBytes, null),
+ new SimpleRecord(null, "f".getBytes),
+ new SimpleRecord("g".getBytes)
+ ), leaderEpoch = 3)
+
+
log.appendAsLeader(MemoryRecords.withIdempotentRecords(CompressionType.NONE,
29342342L, 15.toShort, 234123,
+ new SimpleRecord("h".getBytes)
+ ), leaderEpoch = 3)
+
+
log.appendAsLeader(MemoryRecords.withTransactionalRecords(CompressionType.GZIP,
98323L, 99.toShort, 266,
+ new SimpleRecord("i".getBytes),
+ new SimpleRecord("j".getBytes)
+ ), leaderEpoch = 5)
+
+ log.appendAsLeader(MemoryRecords.withEndTransactionMarker(98323L,
99.toShort,
+ new EndTransactionMarker(ControlRecordType.COMMIT, 100)
+ ), origin = AppendOrigin.Coordinator, leaderEpoch = 7)
+
+ assertDumpLogRecordMetadata()
+ }
+
+ @Test
def testPrintDataLog(): Unit = {
addSimpleRecords()
def verifyRecordsInOutput(checkKeysAndValues: Boolean, args:
Array[String]): Unit = {
@@ -251,4 +287,107 @@ class DumpLogSegmentsTest {
}
outContent.toString
}
+
+ private def readBatchMetadata(lines: util.ListIterator[String]):
Option[String] = {
+ while (lines.hasNext) {
+ val line = lines.next()
+ if (line.startsWith("|")) {
+ throw new IllegalStateException("Read unexpected record entry")
+ } else if (line.startsWith("baseOffset")) {
+ return Some(line)
+ }
+ }
+ None
+ }
+
+ private def readBatchRecords(lines: util.ListIterator[String]): Seq[String]
= {
+ val records = mutable.ArrayBuffer.empty[String]
+ while (lines.hasNext) {
+ val line = lines.next()
+ if (line.startsWith("|")) {
+ records += line.substring(1)
+ } else {
+ lines.previous()
+ return records.toSeq
+ }
+ }
+ records.toSeq
+ }
+
+ private def parseMetadataFields(line: String): Map[String, String] = {
+ val fields = mutable.Map.empty[String, String]
+ val tokens = line.split("\\s+").map(_.trim()).filter(_.nonEmpty).iterator
+
+ while (tokens.hasNext) {
+ val token = tokens.next()
+ if (!token.endsWith(":")) {
+ throw new IllegalStateException(s"Unexpected non-field token $token")
+ }
+
+ val field = token.substring(0, token.length - 1)
+ if (!tokens.hasNext) {
+ throw new IllegalStateException(s"Failed to parse value for $field")
+ }
+
+ val value = tokens.next()
+ fields += field -> value
+ }
+
+ fields.toMap
+ }
+
+ private def assertDumpLogRecordMetadata(): Unit = {
+ val logReadInfo = log.read(
+ startOffset = 0,
+ maxLength = Int.MaxValue,
+ isolation = FetchLogEnd,
+ minOneMessage = true
+ )
+
+ val output = runDumpLogSegments(Array("--deep-iteration", "--files",
logFilePath))
+ val lines = util.Arrays.asList(output.split("\n"): _*).listIterator()
+
+ for (batch <- logReadInfo.records.batches.asScala) {
+ val parsedBatchOpt = readBatchMetadata(lines)
+ assertTrue(parsedBatchOpt.isDefined)
+
+ val parsedBatch = parseMetadataFields(parsedBatchOpt.get)
+ assertEquals(Some(batch.baseOffset),
parsedBatch.get("baseOffset").map(_.toLong))
+ assertEquals(Some(batch.lastOffset),
parsedBatch.get("lastOffset").map(_.toLong))
+ assertEquals(Option(batch.countOrNull),
parsedBatch.get("count").map(_.toLong))
+ assertEquals(Some(batch.partitionLeaderEpoch),
parsedBatch.get("partitionLeaderEpoch").map(_.toInt))
+ assertEquals(Some(batch.isTransactional),
parsedBatch.get("isTransactional").map(_.toBoolean))
+ assertEquals(Some(batch.isControlBatch),
parsedBatch.get("isControl").map(_.toBoolean))
+ assertEquals(Some(batch.producerId),
parsedBatch.get("producerId").map(_.toLong))
+ assertEquals(Some(batch.producerEpoch),
parsedBatch.get("producerEpoch").map(_.toShort))
+ assertEquals(Some(batch.baseSequence),
parsedBatch.get("baseSequence").map(_.toInt))
+ assertEquals(Some(batch.compressionType.name),
parsedBatch.get("compresscodec"))
+
+ val parsedRecordIter = readBatchRecords(lines).iterator
+ for (record <- batch.asScala) {
+ assertTrue(parsedRecordIter.hasNext)
+ val parsedRecord = parseMetadataFields(parsedRecordIter.next())
+ assertEquals(Some(record.offset),
parsedRecord.get("offset").map(_.toLong))
+ assertEquals(Some(record.keySize),
parsedRecord.get("keySize").map(_.toInt))
+ assertEquals(Some(record.valueSize),
parsedRecord.get("valueSize").map(_.toInt))
+ assertEquals(Some(record.timestamp),
parsedRecord.get(batch.timestampType.name).map(_.toLong))
+
+ if (batch.magic >= RecordVersion.V2.value) {
+ assertEquals(Some(record.sequence),
parsedRecord.get("sequence").map(_.toInt))
+ }
+
+ // Batch fields should not be present in the record output
+ assertEquals(None, parsedRecord.get("baseOffset"))
+ assertEquals(None, parsedRecord.get("lastOffset"))
+ assertEquals(None, parsedRecord.get("partitionLeaderEpoch"))
+ assertEquals(None, parsedRecord.get("producerId"))
+ assertEquals(None, parsedRecord.get("producerEpoch"))
+ assertEquals(None, parsedRecord.get("baseSequence"))
+ assertEquals(None, parsedRecord.get("isTransactional"))
+ assertEquals(None, parsedRecord.get("isControl"))
+ assertEquals(None, parsedRecord.get("compresscodec"))
+ }
+ }
+ }
+
}