This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5c01fd0b763 KAFKA-18949 add consumer protocol to 
testDeleteRecordsAfterCorruptRecords (#19317)
5c01fd0b763 is described below

commit 5c01fd0b763ee154d2bc1bdbb6614f4bcfd9a054
Author: PoAn Yang <[email protected]>
AuthorDate: Thu Apr 3 13:24:25 2025 +0800

    KAFKA-18949 add consumer protocol to testDeleteRecordsAfterCorruptRecords 
(#19317)
    
    The `PlaintextAdminIntegrationTest#testDeleteRecordsAfterCorruptRecords`
    was only enabled for classic protocol. Add consumer protocol to it.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../integration/kafka/api/PlaintextAdminIntegrationTest.scala  | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index fa563971edb..d1e04f8197a 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -1565,7 +1565,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  
@MethodSource(Array("getTestGroupProtocolParametersClassicGroupProtocolOnly"))
+  @MethodSource(Array("getTestGroupProtocolParametersAll"))
   def testDeleteRecordsAfterCorruptRecords(groupProtocol: String): Unit = {
     val config = new Properties()
     config.put(TopicConfig.SEGMENT_BYTES_CONFIG, "200")
@@ -1573,9 +1573,6 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
     client = createAdminClient
 
-    val consumer = createConsumer()
-    subscribeAndWaitForAssignment(topic, consumer)
-
     val producer = createProducer()
     def sendRecords(begin: Int, end: Int) = {
       val futures = (begin until end).map( i => {
@@ -1608,7 +1605,10 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     newContent.flip()
     Files.write(logFilePath, newContent.array(), 
StandardOpenOption.TRUNCATE_EXISTING)
 
-    consumer.seekToBeginning(Collections.singletonList(topicPartition))
+    val overrideConfig = new Properties
+    overrideConfig.setProperty("auto.offset.reset", "earliest")
+    val consumer = createConsumer(configOverrides = overrideConfig)
+    consumer.subscribe(Seq(topic).asJava)
     assertEquals("Encountered corrupt message when fetching offset 0 for 
topic-partition topic-0",
       assertThrows(classOf[KafkaException], () => 
consumer.poll(JDuration.ofMillis(DEFAULT_MAX_WAIT_MS))).getMessage)
 

Reply via email to