This is an automated email from the ASF dual-hosted git repository.

rndgstn pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0e0282395dc KAFKA-15366: Modify LogDirFailureTest for KRaft (#14977)
0e0282395dc is described below

commit 0e0282395dc69b351812bf9fa060dd282e07245c
Author: Viktor Somogyi-Vass <viktorsomo...@gmail.com>
AuthorDate: Wed Dec 20 03:02:49 2023 +0100

    KAFKA-15366: Modify LogDirFailureTest for KRaft (#14977)
    
    Reviewers: Omnia G.H Ibrahim <o.g.h.ibra...@gmail.com>, Ron Dagostino 
<rdagost...@confluent.io>, Igor Soarez <soa...@apple.com>
---
 .../unit/kafka/server/LogDirFailureTest.scala      | 82 ++++++++++++++--------
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  3 +-
 2 files changed, 53 insertions(+), 32 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala 
b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index a403088d58b..c63f4664096 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -21,8 +21,8 @@ import java.util.Collections
 import java.util.concurrent.{ExecutionException, TimeUnit}
 import kafka.api.IntegrationTestHarness
 import kafka.controller.{OfflineReplica, PartitionAndReplica}
-import kafka.utils.TestUtils.{Checkpoint, LogDirFailureType, Roll}
-import kafka.utils.{CoreUtils, Exit, TestUtils}
+import kafka.utils.TestUtils.{waitUntilTrue, Checkpoint, LogDirFailureType, 
Roll}
+import kafka.utils.{CoreUtils, Exit, TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.consumer.Consumer
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
@@ -30,6 +30,8 @@ import org.apache.kafka.common.errors.{KafkaStorageException, 
NotLeaderOrFollowe
 import org.apache.kafka.common.utils.Utils
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.ParameterizedTest
 
 import java.nio.file.Files
 import scala.annotation.nowarn
@@ -56,20 +58,22 @@ class LogDirFailureTest extends IntegrationTestHarness {
     createTopic(topic, partitionNum, brokerCount)
   }
 
-  @Test
-  def testProduceErrorFromFailureOnLogRoll(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testProduceErrorFromFailureOnLogRoll(quorum: String): Unit = {
     testProduceErrorsFromLogDirFailureOnLeader(Roll)
   }
 
-  @Test
-  def testIOExceptionDuringLogRoll(): Unit = {
-    testProduceAfterLogDirFailureOnLeader(Roll)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testIOExceptionDuringLogRoll(quorum: String): Unit = {
+    testProduceAfterLogDirFailureOnLeader(Roll, quorum)
   }
 
   // Broker should halt on any log directory failure if inter-broker protocol 
< 1.0
   @nowarn("cat=deprecation")
   @Test
-  def brokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure(): Unit = {
+  def testZkBrokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure(): Unit 
= {
     @volatile var statusCodeOption: Option[Int] = None
     Exit.setHaltProcedure { (statusCode, _) =>
       statusCodeOption = Some(statusCode)
@@ -97,18 +101,21 @@ class LogDirFailureTest extends IntegrationTestHarness {
     }
   }
 
-  @Test
-  def testProduceErrorFromFailureOnCheckpoint(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testProduceErrorFromFailureOnCheckpoint(quorum: String): Unit = {
     testProduceErrorsFromLogDirFailureOnLeader(Checkpoint)
   }
 
-  @Test
-  def testIOExceptionDuringCheckpoint(): Unit = {
-    testProduceAfterLogDirFailureOnLeader(Checkpoint)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testIOExceptionDuringCheckpoint(quorum: String): Unit = {
+    testProduceAfterLogDirFailureOnLeader(Checkpoint, quorum)
   }
 
-  @Test
-  def testReplicaFetcherThreadAfterLogDirFailureOnFollower(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testReplicaFetcherThreadAfterLogDirFailureOnFollower(quorum: String): 
Unit = {
     this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
     this.producerConfig.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
"false")
     val producer = createProducer()
@@ -116,9 +123,9 @@ class LogDirFailureTest extends IntegrationTestHarness {
 
     val partitionInfo = 
producer.partitionsFor(topic).asScala.find(_.partition() == 0).get
     val leaderServerId = partitionInfo.leader().id()
-    val leaderServer = servers.find(_.config.brokerId == leaderServerId).get
+    val leaderServer = brokers.find(_.config.brokerId == leaderServerId).get
     val followerServerId = partitionInfo.replicas().map(_.id()).find(_ != 
leaderServerId).get
-    val followerServer = servers.find(_.config.brokerId == 
followerServerId).get
+    val followerServer = brokers.find(_.config.brokerId == 
followerServerId).get
 
     followerServer.replicaManager.markPartitionOffline(partition)
     // Send a message to another partition whose leader is the same as 
partition 0
@@ -149,7 +156,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
     val record = new ProducerRecord(topic, 0, s"key".getBytes, 
s"value".getBytes)
 
     val leaderServerId = 
producer.partitionsFor(topic).asScala.find(_.partition() == 0).get.leader().id()
-    val leaderServer = servers.find(_.config.brokerId == leaderServerId).get
+    val leaderServer = brokers.find(_.config.brokerId == leaderServerId).get
 
     TestUtils.causeLogDirFailure(failureType, leaderServer, partition)
 
@@ -160,7 +167,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
       e.getCause.isInstanceOf[NotLeaderOrFollowerException])
   }
 
-  def testProduceAfterLogDirFailureOnLeader(failureType: LogDirFailureType): 
Unit = {
+  def testProduceAfterLogDirFailureOnLeader(failureType: LogDirFailureType, 
quorum: String): Unit = {
     val consumer = createConsumer()
     subscribeAndWaitForAssignment(topic, consumer)
 
@@ -169,20 +176,20 @@ class LogDirFailureTest extends IntegrationTestHarness {
     val partition = new TopicPartition(topic, 0)
     val record = new ProducerRecord(topic, 0, s"key".getBytes, 
s"value".getBytes)
 
-    val leaderServerId = 
producer.partitionsFor(topic).asScala.find(_.partition() == 0).get.leader().id()
-    val leaderServer = servers.find(_.config.brokerId == leaderServerId).get
+    val originalLeaderServerId = 
producer.partitionsFor(topic).asScala.find(_.partition() == 0).get.leader().id()
+    val originalLeaderServer = brokers.find(_.config.brokerId == 
originalLeaderServerId).get
 
     // The first send() should succeed
     producer.send(record).get()
     TestUtils.consumeRecords(consumer, 1)
 
-    TestUtils.causeLogDirFailure(failureType, leaderServer, partition)
+    val failedLogDir = TestUtils.causeLogDirFailure(failureType, 
originalLeaderServer, partition)
 
     TestUtils.waitUntilTrue(() => {
       // ProduceResponse may contain KafkaStorageException and trigger 
metadata update
       producer.send(record)
-      producer.partitionsFor(topic).asScala.find(_.partition() == 
0).get.leader().id() != leaderServerId
-    }, "Expected new leader for the partition", 6000L)
+      producer.partitionsFor(topic).asScala.find(_.partition() == 
0).get.leader().id() != originalLeaderServerId
+    }, "Expected new leader for the partition")
 
     // Block on send to ensure that new leader accepts a message.
     producer.send(record).get(6000L, TimeUnit.MILLISECONDS)
@@ -190,13 +197,26 @@ class LogDirFailureTest extends IntegrationTestHarness {
     // Consumer should receive some messages
     TestUtils.pollUntilAtLeastNumRecords(consumer, 1)
 
-    // There should be no remaining LogDirEventNotification znode
-    assertTrue(zkClient.getAllLogDirEventNotifications.isEmpty)
-
-    // The controller should have marked the replica on the original leader as 
offline
-    val controllerServer = servers.find(_.kafkaController.isActive).get
-    val offlineReplicas = 
controllerServer.kafkaController.controllerContext.replicasInState(topic, 
OfflineReplica)
-    assertTrue(offlineReplicas.contains(PartitionAndReplica(new 
TopicPartition(topic, 0), leaderServerId)))
+    if (quorum == "kraft") {
+      waitUntilTrue(() => {
+        // get the broker with broker.nodeId == originalLeaderServerId
+        val brokerWithDirFail = brokers.find(_.config.nodeId == 
originalLeaderServerId).map(_.asInstanceOf[BrokerServer])
+        // check if the broker has the offline log dir
+        val hasOfflineDir = 
brokerWithDirFail.exists(_.logDirFailureChannel.hasOfflineLogDir(failedLogDir.toPath.toString))
+        // check if the broker has the offline replica
+        hasOfflineDir && brokerWithDirFail.exists(broker =>
+          broker.replicaManager.metadataCache
+            .getClusterMetadata(broker.clusterId, 
broker.config.interBrokerListenerName)
+            .partition(new TopicPartition(topic, 
0)).offlineReplicas().map(_.id()).contains(originalLeaderServerId))
+      }, "Expected to find an offline log dir")
+    } else {
+      // There should be no remaining LogDirEventNotification znode
+      assertTrue(zkClient.getAllLogDirEventNotifications.isEmpty)
+      // The controller should have marked the replica on the original leader 
as offline
+      val controllerServer = servers.find(_.kafkaController.isActive).get
+      val offlineReplicas = 
controllerServer.kafkaController.controllerContext.replicasInState(topic, 
OfflineReplica)
+      assertTrue(offlineReplicas.contains(PartitionAndReplica(new 
TopicPartition(topic, 0), originalLeaderServerId)))
+    }
   }
 
 
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 7887753a9d1..9eb284729bd 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1641,7 +1641,7 @@ object TestUtils extends Logging {
   }
 
 
-  def causeLogDirFailure(failureType: LogDirFailureType, leaderBroker: 
KafkaBroker, partition: TopicPartition): Unit = {
+  def causeLogDirFailure(failureType: LogDirFailureType, leaderBroker: 
KafkaBroker, partition: TopicPartition): File = {
     // Make log directory of the partition on the leader broker inaccessible 
by replacing it with a file
     val localLog = leaderBroker.replicaManager.localLogOrException(partition)
     val logDir = localLog.dir.getParentFile
@@ -1658,6 +1658,7 @@ object TestUtils extends Logging {
     // Wait for ReplicaHighWatermarkCheckpoint to happen so that the log 
directory of the topic will be offline
     waitUntilTrue(() => 
!leaderBroker.logManager.isLogDirOnline(logDir.getAbsolutePath), "Expected log 
directory offline", 3000L)
     assertTrue(leaderBroker.replicaManager.localLog(partition).isEmpty)
+    logDir
   }
 
   /**

Reply via email to