This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a116753cc82 KAFKA-17986 Fix ConsumerRebootstrapTest and
ProducerRebootstrapTest (#18175)
a116753cc82 is described below
commit a116753cc820d42e4b4dc5b248caed0f05c379d9
Author: Peter Lee <[email protected]>
AuthorDate: Fri Jan 10 01:02:34 2025 +0800
KAFKA-17986 Fix ConsumerRebootstrapTest and ProducerRebootstrapTest (#18175)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../integration/kafka/api/ConsumerRebootstrapTest.scala | 1 -
.../integration/kafka/api/ProducerRebootstrapTest.scala | 3 +--
.../test/scala/integration/kafka/api/RebootstrapTest.scala | 14 +++++++++++++-
3 files changed, 14 insertions(+), 4 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
b/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
index 5d6622799fe..a20e6954f8e 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
class ConsumerRebootstrapTest extends RebootstrapTest {
- @Disabled("KAFKA-17986")
@ParameterizedTest(name = RebootstrapTestName)
@MethodSource(Array("rebootstrapTestParams"))
def testRebootstrap(quorum: String, groupProtocol: String,
useRebootstrapTriggerMs: Boolean): Unit = {
diff --git
a/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
index f32c4433b45..aa8f46d7997 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
@@ -18,15 +18,14 @@ package kafka.api
import org.apache.kafka.clients.producer.ProducerRecord
import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Disabled
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
class ProducerRebootstrapTest extends RebootstrapTest {
- @Disabled("KAFKA-17986")
@ParameterizedTest(name =
"{displayName}.quorum=kraft.useRebootstrapTriggerMs={0}")
@ValueSource(booleans = Array(false, true))
def testRebootstrap(useRebootstrapTriggerMs: Boolean): Unit = {
+ // It's ok to shut the leader down, cause the reelection is small enough
to the producer timeout.
server1.shutdown()
server1.awaitShutdown()
diff --git a/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
b/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
index 68982405370..2d84284cd6b 100644
--- a/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
+++ b/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
@@ -20,6 +20,7 @@ import kafka.server.{KafkaBroker, KafkaConfig}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
import java.util.Properties
@@ -29,10 +30,21 @@ abstract class RebootstrapTest extends AbstractConsumerTest
{
def server0: KafkaBroker = serverForId(0).get
def server1: KafkaBroker = serverForId(1).get
+ @BeforeEach
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.doSetup(testInfo, createOffsetsTopic = true)
+
+ // Enable unclean leader election for the test topic
+ val topicProps = new Properties
+ topicProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true")
+
+ // create the test topic with all the brokers as replicas
+ createTopic(topic, 2, brokerCount, adminClientConfig =
this.adminClientConfig, topicConfig = topicProps)
+ }
+
override def generateConfigs: Seq[KafkaConfig] = {
val overridingProps = new Properties()
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
brokerCount.toString)
- overridingProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
"true")
// In this test, fixed ports are necessary, because brokers must have the
// same port after the restart.