This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a116753cc82 KAFKA-17986 Fix ConsumerRebootstrapTest and 
ProducerRebootstrapTest (#18175)
a116753cc82 is described below

commit a116753cc820d42e4b4dc5b248caed0f05c379d9
Author: Peter Lee <[email protected]>
AuthorDate: Fri Jan 10 01:02:34 2025 +0800

    KAFKA-17986 Fix ConsumerRebootstrapTest and ProducerRebootstrapTest (#18175)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../integration/kafka/api/ConsumerRebootstrapTest.scala    |  1 -
 .../integration/kafka/api/ProducerRebootstrapTest.scala    |  3 +--
 .../test/scala/integration/kafka/api/RebootstrapTest.scala | 14 +++++++++++++-
 3 files changed, 14 insertions(+), 4 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala 
b/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
index 5d6622799fe..a20e6954f8e 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.TimeoutException
 
 class ConsumerRebootstrapTest extends RebootstrapTest {
-  @Disabled("KAFKA-17986")
   @ParameterizedTest(name = RebootstrapTestName)
   @MethodSource(Array("rebootstrapTestParams"))
   def testRebootstrap(quorum: String, groupProtocol: String, 
useRebootstrapTriggerMs: Boolean): Unit = {
diff --git 
a/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
index f32c4433b45..aa8f46d7997 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
@@ -18,15 +18,14 @@ package kafka.api
 
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Disabled
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
 class ProducerRebootstrapTest extends RebootstrapTest {
-  @Disabled("KAFKA-17986")
   @ParameterizedTest(name = 
"{displayName}.quorum=kraft.useRebootstrapTriggerMs={0}")
   @ValueSource(booleans = Array(false, true))
   def testRebootstrap(useRebootstrapTriggerMs: Boolean): Unit = {
+    // It's ok to shut the leader down, cause the reelection is small enough 
to the producer timeout.
     server1.shutdown()
     server1.awaitShutdown()
 
diff --git a/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala 
b/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
index 68982405370..2d84284cd6b 100644
--- a/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
+++ b/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
@@ -20,6 +20,7 @@ import kafka.server.{KafkaBroker, KafkaConfig}
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
 
 import java.util.Properties
 
@@ -29,10 +30,21 @@ abstract class RebootstrapTest extends AbstractConsumerTest 
{
   def server0: KafkaBroker = serverForId(0).get
   def server1: KafkaBroker = serverForId(1).get
 
+  @BeforeEach
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.doSetup(testInfo, createOffsetsTopic = true)
+
+    // Enable unclean leader election for the test topic
+    val topicProps = new Properties
+    topicProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true")
+
+    // create the test topic with all the brokers as replicas
+    createTopic(topic, 2, brokerCount, adminClientConfig = 
this.adminClientConfig, topicConfig = topicProps)
+  }
+
   override def generateConfigs: Seq[KafkaConfig] = {
     val overridingProps = new Properties()
     
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
 brokerCount.toString)
-    overridingProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, 
"true")
 
     // In this test, fixed ports are necessary, because brokers must have the
     // same port after the restart.

Reply via email to