This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 00cd9cd3f90 KAFKA-18003: Add test to make sure `Admin#deleteRecords` 
can handle the corrupted records (#17840)
00cd9cd3f90 is described below

commit 00cd9cd3f90c697ff6538bc51c2be0e6e509f8c2
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Mon Nov 18 17:36:01 2024 +0800

    KAFKA-18003: Add test to make sure `Admin#deleteRecords` can handle the 
corrupted records (#17840)
    
    Reviewers: Divij Vaidya <[email protected]>
---
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 69 +++++++++++++++++++++-
 1 file changed, 68 insertions(+), 1 deletion(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 1eba4dba0db..15925714968 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -18,6 +18,8 @@ package kafka.api
 
 import java.io.File
 import java.net.InetAddress
+import java.nio.ByteBuffer
+import java.nio.file.{Files, Paths, StandardOpenOption}
 import java.lang.{Long => JLong}
 import java.time.{Duration => JDuration}
 import java.util.Arrays.asList
@@ -39,7 +41,9 @@ import org.apache.kafka.common.acl.{AccessControlEntry, 
AclBinding, AclBindingFi
 import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig, 
SslConfigs, TopicConfig}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, 
ClientQuotaEntity, ClientQuotaFilter}
+import org.apache.kafka.common.record.FileRecords
 import org.apache.kafka.common.requests.DeleteRecordsRequest
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, 
ResourceType}
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
ByteArraySerializer}
@@ -50,7 +54,7 @@ import org.apache.kafka.coordinator.group.{GroupConfig, 
GroupCoordinatorConfig}
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.security.authorizer.AclEntry
 import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, 
ServerLogConfigs}
-import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, 
LogFileUtils}
 import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS
 import org.apache.log4j.PropertyConfigurator
 import org.junit.jupiter.api.Assertions._
@@ -1543,6 +1547,69 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertNull(returnedOffsets.get(topicPartition))
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  def testDeleteRecordsAfterCorruptRecords(quorum: String, groupProtocol: 
String): Unit = {
+    val config = new Properties()
+    config.put(TopicConfig.SEGMENT_BYTES_CONFIG, "200")
+    createTopic(topic, numPartitions = 1, replicationFactor = 1, config)
+
+    client = createAdminClient
+
+    val consumer = createConsumer()
+    subscribeAndWaitForAssignment(topic, consumer)
+
+    val producer = createProducer()
+    def sendRecords(begin: Int, end: Int) = {
+      val futures = (begin until end).map( i => {
+        val record = new ProducerRecord(topic, partition, s"$i".getBytes, 
s"$i".getBytes)
+        producer.send(record)
+      })
+      futures.foreach(_.get)
+    }
+    sendRecords(0, 10)
+    sendRecords(10, 20)
+
+    val topicDesc = 
client.describeTopics(Collections.singletonList(topic)).allTopicNames().get().get(topic)
+    assertEquals(1, topicDesc.partitions().size())
+    val partitionLeaderId = topicDesc.partitions().get(0).leader().id()
+    val logDirMap = 
client.describeLogDirs(Collections.singletonList(partitionLeaderId))
+      .allDescriptions().get().get(partitionLeaderId)
+    val logDir = logDirMap.entrySet.stream
+      .filter(entry => 
entry.getValue.replicaInfos.containsKey(topicPartition)).findAny().get().getKey
+    // retrieve the path of the first segment
+    val logFilePath = 
LogFileUtils.logFile(Paths.get(logDir).resolve(topicPartition.toString).toFile, 
0).toPath
+    val firstSegmentRecordsSize = 
FileRecords.open(logFilePath.toFile).records().asScala.iterator.size
+    assertTrue(firstSegmentRecordsSize > 0)
+
+    // manually load the inactive segment file to corrupt the data
+    val originalContent = Files.readAllBytes(logFilePath)
+    val newContent = ByteBuffer.allocate(JLong.BYTES + Integer.BYTES + 
originalContent.length)
+    newContent.putLong(0) // offset
+    newContent.putInt(0) // size -> this will make FileLogInputStream throw 
"Found record size 0 smaller than minimum record..."
+    newContent.put(Files.readAllBytes(logFilePath))
+    newContent.flip()
+    Files.write(logFilePath, newContent.array(), 
StandardOpenOption.TRUNCATE_EXISTING)
+
+    consumer.seekToBeginning(Collections.singletonList(topicPartition))
+    assertEquals("Encountered corrupt message when fetching offset 0 for 
topic-partition topic-0",
+      assertThrows(classOf[KafkaException], () => 
consumer.poll(JDuration.ofMillis(DEFAULT_MAX_WAIT_MS))).getMessage)
+
+    val partitionFollowerId = brokers.map(b => b.config.nodeId).filter(id => 
id != partitionLeaderId).head
+    val newAssignment = Map(topicPartition -> Optional.of(new 
NewPartitionReassignment(
+        List(Integer.valueOf(partitionLeaderId), 
Integer.valueOf(partitionFollowerId)).asJava))).asJava
+
+    // add follower to topic partition
+    client.alterPartitionReassignments(newAssignment).all().get()
+    // delete records in corrupt segment (the first segment)
+    client.deleteRecords(Map(topicPartition -> 
RecordsToDelete.beforeOffset(firstSegmentRecordsSize)).asJava).all.get
+    // verify reassignment is finished after delete records
+    TestUtils.waitForBrokersInIsr(client, topicPartition, 
Set(partitionLeaderId, partitionFollowerId))
+    // seek to beginning and make sure we can consume all records
+    consumer.seekToBeginning(Collections.singletonList(topicPartition))
+    assertEquals(19, TestUtils.consumeRecords(consumer, 20 - 
firstSegmentRecordsSize).last.offset())
+  }
+
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testConsumeAfterDeleteRecords(quorum: String, groupProtocol: String): 
Unit = {

Reply via email to