This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new e9416ee  MINOR: Remove redundant fields in dump log record output 
(#11101)
e9416ee is described below

commit e9416eed2d66143b3d8e0394a1f1859a2b962280
Author: Jason Gustafson <[email protected]>
AuthorDate: Fri Jul 23 15:56:41 2021 -0700

    MINOR: Remove redundant fields in dump log record output (#11101)
    
    In 2.8, the dump log output regressed to print batch level information for 
each record, which makes the output much noisier. This patch changes the output 
to what it was in 2.7 and previous versions. We only print batch metadata at 
the batch level.
    
    Reviewers: David Arthur <[email protected]>, Ismael Juma <[email protected]>
---
 .../main/scala/kafka/tools/DumpLogSegments.scala   |  11 +-
 .../unit/kafka/tools/DumpLogSegmentsTest.scala     | 145 ++++++++++++++++++++-
 2 files changed, 146 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala 
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index c495dbc..e097206 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -268,13 +268,10 @@ object DumpLogSegments {
             }
             lastOffset = record.offset
 
-            var prefix = s"${RecordIndent} "
+            var prefix = s"$RecordIndent "
             if (!skipRecordMetadata) {
-              print(s"${prefix}offset: ${record.offset}" +
-                  s" keySize: ${record.keySize} valueSize: ${record.valueSize} 
${batch.timestampType}: ${record.timestamp}" +
-                  s" baseOffset: ${batch.baseOffset} lastOffset: 
${batch.lastOffset} baseSequence: ${batch.baseSequence}" +
-                  s" lastSequence: ${batch.lastSequence} producerEpoch: 
${batch.producerEpoch} partitionLeaderEpoch: ${batch.partitionLeaderEpoch}" +
-                  s" batchSize: ${batch.sizeInBytes} magic: ${batch.magic} 
compressType: ${batch.compressionType} position: ${validBytes}")
+              print(s"${prefix}offset: ${record.offset} 
${batch.timestampType}: ${record.timestamp} " +
+                s"keySize: ${record.keySize} valueSize: ${record.valueSize}")
               prefix = " "
 
               if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
@@ -327,7 +324,7 @@ object DumpLogSegments {
 
     println(" position: " + accumulativeBytes + " " + batch.timestampType + ": 
" + batch.maxTimestamp +
       " size: " + batch.sizeInBytes + " magic: " + batch.magic +
-      " compresscodec: " + batch.compressionType + " crc: " + batch.checksum + 
" isvalid: " + batch.isValid)
+      " compresscodec: " + batch.compressionType.name + " crc: " + 
batch.checksum + " isvalid: " + batch.isValid)
   }
 
   class TimeIndexDumpErrors {
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala 
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index bcfe754..ea3304d 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -21,20 +21,22 @@ import java.io.{ByteArrayOutputStream, File, PrintWriter}
 import java.nio.ByteBuffer
 import java.util
 import java.util.Properties
-import kafka.log.{Log, LogConfig, LogManager, LogTestUtils}
-import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
+
+import kafka.log.{AppendOrigin, Log, LogConfig, LogManager, LogTestUtils}
+import kafka.server.{BrokerTopicStats, FetchLogEnd, LogDirFailureChannel}
 import kafka.tools.DumpLogSegments.TimeIndexDumpErrors
 import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.metadata.{PartitionChangeRecord, 
RegisterBrokerRecord, TopicRecord}
 import org.apache.kafka.common.protocol.{ByteBufferAccessor, 
ObjectSerializationCache}
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
SimpleRecord}
+import org.apache.kafka.common.record.{CompressionType, ControlRecordType, 
EndTransactionMarker, MemoryRecords, RecordVersion, SimpleRecord}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.metadata.MetadataRecordSerde
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
+import scala.jdk.CollectionConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
@@ -89,6 +91,40 @@ class DumpLogSegmentsTest {
   }
 
   @Test
+  def testBatchAndRecordMetadataOutput(): Unit = {
+    log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 0,
+      new SimpleRecord("a".getBytes),
+      new SimpleRecord("b".getBytes)
+    ), leaderEpoch = 0)
+
+    log.appendAsLeader(MemoryRecords.withRecords(CompressionType.GZIP, 0,
+      new SimpleRecord(time.milliseconds(), "c".getBytes, "1".getBytes),
+      new SimpleRecord("d".getBytes)
+    ), leaderEpoch = 3)
+
+    log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 0,
+      new SimpleRecord("e".getBytes, null),
+      new SimpleRecord(null, "f".getBytes),
+      new SimpleRecord("g".getBytes)
+    ), leaderEpoch = 3)
+
+    
log.appendAsLeader(MemoryRecords.withIdempotentRecords(CompressionType.NONE, 
29342342L, 15.toShort, 234123,
+      new SimpleRecord("h".getBytes)
+    ), leaderEpoch = 3)
+
+    
log.appendAsLeader(MemoryRecords.withTransactionalRecords(CompressionType.GZIP, 
98323L, 99.toShort, 266,
+      new SimpleRecord("i".getBytes),
+      new SimpleRecord("j".getBytes)
+    ), leaderEpoch = 5)
+
+    log.appendAsLeader(MemoryRecords.withEndTransactionMarker(98323L, 
99.toShort,
+      new EndTransactionMarker(ControlRecordType.COMMIT, 100)
+    ), origin = AppendOrigin.Coordinator, leaderEpoch = 7)
+
+    assertDumpLogRecordMetadata()
+  }
+
+  @Test
   def testPrintDataLog(): Unit = {
     addSimpleRecords()
     def verifyRecordsInOutput(checkKeysAndValues: Boolean, args: 
Array[String]): Unit = {
@@ -251,4 +287,107 @@ class DumpLogSegmentsTest {
     }
     outContent.toString
   }
+
+  private def readBatchMetadata(lines: util.ListIterator[String]): 
Option[String] = {
+    while (lines.hasNext) {
+      val line = lines.next()
+      if (line.startsWith("|")) {
+        throw new IllegalStateException("Read unexpected record entry")
+      } else if (line.startsWith("baseOffset")) {
+        return Some(line)
+      }
+    }
+    None
+  }
+
+  private def readBatchRecords(lines: util.ListIterator[String]): Seq[String] 
= {
+    val records = mutable.ArrayBuffer.empty[String]
+    while (lines.hasNext) {
+      val line = lines.next()
+      if (line.startsWith("|")) {
+        records += line.substring(1)
+      } else {
+        lines.previous()
+        return records.toSeq
+      }
+    }
+    records.toSeq
+  }
+
+  private def parseMetadataFields(line: String): Map[String, String] = {
+    val fields = mutable.Map.empty[String, String]
+    val tokens = line.split("\\s+").map(_.trim()).filter(_.nonEmpty).iterator
+
+    while (tokens.hasNext) {
+      val token = tokens.next()
+      if (!token.endsWith(":")) {
+        throw new IllegalStateException(s"Unexpected non-field token $token")
+      }
+
+      val field = token.substring(0, token.length - 1)
+      if (!tokens.hasNext) {
+        throw new IllegalStateException(s"Failed to parse value for $field")
+      }
+
+      val value = tokens.next()
+      fields += field -> value
+    }
+
+    fields.toMap
+  }
+
+  private def assertDumpLogRecordMetadata(): Unit = {
+    val logReadInfo = log.read(
+      startOffset = 0,
+      maxLength = Int.MaxValue,
+      isolation = FetchLogEnd,
+      minOneMessage = true
+    )
+
+    val output = runDumpLogSegments(Array("--deep-iteration", "--files", 
logFilePath))
+    val lines = util.Arrays.asList(output.split("\n"): _*).listIterator()
+
+    for (batch <- logReadInfo.records.batches.asScala) {
+      val parsedBatchOpt = readBatchMetadata(lines)
+      assertTrue(parsedBatchOpt.isDefined)
+
+      val parsedBatch = parseMetadataFields(parsedBatchOpt.get)
+      assertEquals(Some(batch.baseOffset), 
parsedBatch.get("baseOffset").map(_.toLong))
+      assertEquals(Some(batch.lastOffset), 
parsedBatch.get("lastOffset").map(_.toLong))
+      assertEquals(Option(batch.countOrNull), 
parsedBatch.get("count").map(_.toLong))
+      assertEquals(Some(batch.partitionLeaderEpoch), 
parsedBatch.get("partitionLeaderEpoch").map(_.toInt))
+      assertEquals(Some(batch.isTransactional), 
parsedBatch.get("isTransactional").map(_.toBoolean))
+      assertEquals(Some(batch.isControlBatch), 
parsedBatch.get("isControl").map(_.toBoolean))
+      assertEquals(Some(batch.producerId), 
parsedBatch.get("producerId").map(_.toLong))
+      assertEquals(Some(batch.producerEpoch), 
parsedBatch.get("producerEpoch").map(_.toShort))
+      assertEquals(Some(batch.baseSequence), 
parsedBatch.get("baseSequence").map(_.toInt))
+      assertEquals(Some(batch.compressionType.name), 
parsedBatch.get("compresscodec"))
+
+      val parsedRecordIter = readBatchRecords(lines).iterator
+      for (record <- batch.asScala) {
+        assertTrue(parsedRecordIter.hasNext)
+        val parsedRecord = parseMetadataFields(parsedRecordIter.next())
+        assertEquals(Some(record.offset), 
parsedRecord.get("offset").map(_.toLong))
+        assertEquals(Some(record.keySize), 
parsedRecord.get("keySize").map(_.toInt))
+        assertEquals(Some(record.valueSize), 
parsedRecord.get("valueSize").map(_.toInt))
+        assertEquals(Some(record.timestamp), 
parsedRecord.get(batch.timestampType.name).map(_.toLong))
+
+        if (batch.magic >= RecordVersion.V2.value) {
+          assertEquals(Some(record.sequence), 
parsedRecord.get("sequence").map(_.toInt))
+        }
+
+        // Batch fields should not be present in the record output
+        assertEquals(None, parsedRecord.get("baseOffset"))
+        assertEquals(None, parsedRecord.get("lastOffset"))
+        assertEquals(None, parsedRecord.get("partitionLeaderEpoch"))
+        assertEquals(None, parsedRecord.get("producerId"))
+        assertEquals(None, parsedRecord.get("producerEpoch"))
+        assertEquals(None, parsedRecord.get("baseSequence"))
+        assertEquals(None, parsedRecord.get("isTransactional"))
+        assertEquals(None, parsedRecord.get("isControl"))
+        assertEquals(None, parsedRecord.get("compresscodec"))
+      }
+    }
+  }
+
 }

Reply via email to