This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5c01fd0b763 KAFKA-18949 add consumer protocol to
testDeleteRecordsAfterCorruptRecords (#19317)
5c01fd0b763 is described below
commit 5c01fd0b763ee154d2bc1bdbb6614f4bcfd9a054
Author: PoAn Yang <[email protected]>
AuthorDate: Thu Apr 3 13:24:25 2025 +0800
KAFKA-18949 add consumer protocol to testDeleteRecordsAfterCorruptRecords
(#19317)
The `PlaintextAdminIntegrationTest#testDeleteRecordsAfterCorruptRecords`
was only enabled for classic protocol. Add consumer protocol to it.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../integration/kafka/api/PlaintextAdminIntegrationTest.scala | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index fa563971edb..d1e04f8197a 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -1565,7 +1565,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-
@MethodSource(Array("getTestGroupProtocolParametersClassicGroupProtocolOnly"))
+ @MethodSource(Array("getTestGroupProtocolParametersAll"))
def testDeleteRecordsAfterCorruptRecords(groupProtocol: String): Unit = {
val config = new Properties()
config.put(TopicConfig.SEGMENT_BYTES_CONFIG, "200")
@@ -1573,9 +1573,6 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
client = createAdminClient
- val consumer = createConsumer()
- subscribeAndWaitForAssignment(topic, consumer)
-
val producer = createProducer()
def sendRecords(begin: Int, end: Int) = {
val futures = (begin until end).map( i => {
@@ -1608,7 +1605,10 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
newContent.flip()
Files.write(logFilePath, newContent.array(),
StandardOpenOption.TRUNCATE_EXISTING)
- consumer.seekToBeginning(Collections.singletonList(topicPartition))
+ val overrideConfig = new Properties
+ overrideConfig.setProperty("auto.offset.reset", "earliest")
+ val consumer = createConsumer(configOverrides = overrideConfig)
+ consumer.subscribe(Seq(topic).asJava)
assertEquals("Encountered corrupt message when fetching offset 0 for
topic-partition topic-0",
assertThrows(classOf[KafkaException], () =>
consumer.poll(JDuration.ofMillis(DEFAULT_MAX_WAIT_MS))).getMessage)