This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 00cd9cd3f90 KAFKA-18003: Add test to make sure `Admin#deleteRecords`
can handle the corrupted records (#17840)
00cd9cd3f90 is described below
commit 00cd9cd3f90c697ff6538bc51c2be0e6e509f8c2
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Mon Nov 18 17:36:01 2024 +0800
KAFKA-18003: Add test to make sure `Admin#deleteRecords` can handle the
corrupted records (#17840)
Reviewers: Divij Vaidya <[email protected]>
---
.../kafka/api/PlaintextAdminIntegrationTest.scala | 69 +++++++++++++++++++++-
1 file changed, 68 insertions(+), 1 deletion(-)
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 1eba4dba0db..15925714968 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -18,6 +18,8 @@ package kafka.api
import java.io.File
import java.net.InetAddress
+import java.nio.ByteBuffer
+import java.nio.file.{Files, Paths, StandardOpenOption}
import java.lang.{Long => JLong}
import java.time.{Duration => JDuration}
import java.util.Arrays.asList
@@ -39,7 +41,9 @@ import org.apache.kafka.common.acl.{AccessControlEntry,
AclBinding, AclBindingFi
import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig,
SslConfigs, TopicConfig}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.quota.{ClientQuotaAlteration,
ClientQuotaEntity, ClientQuotaFilter}
+import org.apache.kafka.common.record.FileRecords
import org.apache.kafka.common.requests.DeleteRecordsRequest
import org.apache.kafka.common.resource.{PatternType, ResourcePattern,
ResourceType}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer,
ByteArraySerializer}
@@ -50,7 +54,7 @@ import org.apache.kafka.coordinator.group.{GroupConfig,
GroupCoordinatorConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs,
ServerLogConfigs}
-import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig,
LogFileUtils}
import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS
import org.apache.log4j.PropertyConfigurator
import org.junit.jupiter.api.Assertions._
@@ -1543,6 +1547,69 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertNull(returnedOffsets.get(topicPartition))
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ def testDeleteRecordsAfterCorruptRecords(quorum: String, groupProtocol:
String): Unit = {
+ val config = new Properties()
+ config.put(TopicConfig.SEGMENT_BYTES_CONFIG, "200")
+ createTopic(topic, numPartitions = 1, replicationFactor = 1, config)
+
+ client = createAdminClient
+
+ val consumer = createConsumer()
+ subscribeAndWaitForAssignment(topic, consumer)
+
+ val producer = createProducer()
+ def sendRecords(begin: Int, end: Int) = {
+ val futures = (begin until end).map( i => {
+ val record = new ProducerRecord(topic, partition, s"$i".getBytes,
s"$i".getBytes)
+ producer.send(record)
+ })
+ futures.foreach(_.get)
+ }
+ sendRecords(0, 10)
+ sendRecords(10, 20)
+
+ val topicDesc =
client.describeTopics(Collections.singletonList(topic)).allTopicNames().get().get(topic)
+ assertEquals(1, topicDesc.partitions().size())
+ val partitionLeaderId = topicDesc.partitions().get(0).leader().id()
+ val logDirMap =
client.describeLogDirs(Collections.singletonList(partitionLeaderId))
+ .allDescriptions().get().get(partitionLeaderId)
+ val logDir = logDirMap.entrySet.stream
+ .filter(entry =>
entry.getValue.replicaInfos.containsKey(topicPartition)).findAny().get().getKey
+ // retrieve the path of the first segment
+ val logFilePath =
LogFileUtils.logFile(Paths.get(logDir).resolve(topicPartition.toString).toFile,
0).toPath
+ val firstSegmentRecordsSize =
FileRecords.open(logFilePath.toFile).records().asScala.iterator.size
+ assertTrue(firstSegmentRecordsSize > 0)
+
+ // manually load the inactive segment file to corrupt the data
+ val originalContent = Files.readAllBytes(logFilePath)
+ val newContent = ByteBuffer.allocate(JLong.BYTES + Integer.BYTES +
originalContent.length)
+ newContent.putLong(0) // offset
+ newContent.putInt(0) // size -> this will make FileLogInputStream throw
"Found record size 0 smaller than minimum record..."
+ newContent.put(Files.readAllBytes(logFilePath))
+ newContent.flip()
+ Files.write(logFilePath, newContent.array(),
StandardOpenOption.TRUNCATE_EXISTING)
+
+ consumer.seekToBeginning(Collections.singletonList(topicPartition))
+ assertEquals("Encountered corrupt message when fetching offset 0 for
topic-partition topic-0",
+ assertThrows(classOf[KafkaException], () =>
consumer.poll(JDuration.ofMillis(DEFAULT_MAX_WAIT_MS))).getMessage)
+
+ val partitionFollowerId = brokers.map(b => b.config.nodeId).filter(id =>
id != partitionLeaderId).head
+ val newAssignment = Map(topicPartition -> Optional.of(new
NewPartitionReassignment(
+ List(Integer.valueOf(partitionLeaderId),
Integer.valueOf(partitionFollowerId)).asJava))).asJava
+
+ // add follower to topic partition
+ client.alterPartitionReassignments(newAssignment).all().get()
+ // delete records in corrupt segment (the first segment)
+ client.deleteRecords(Map(topicPartition ->
RecordsToDelete.beforeOffset(firstSegmentRecordsSize)).asJava).all.get
+ // verify reassignment is finished after delete records
+ TestUtils.waitForBrokersInIsr(client, topicPartition,
Set(partitionLeaderId, partitionFollowerId))
+ // seek to beginning and make sure we can consume all records
+ consumer.seekToBeginning(Collections.singletonList(topicPartition))
+ assertEquals(19, TestUtils.consumeRecords(consumer, 20 -
firstSegmentRecordsSize).last.offset())
+ }
+
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumeAfterDeleteRecords(quorum: String, groupProtocol: String):
Unit = {