This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 163d00b3e61 KAFKA-14140: Ensure an offline or in-controlled-shutdown 
replica is not eligible to join ISR in ZK mode (#12487)
163d00b3e61 is described below

commit 163d00b3e61ac63cb0b65a4d9b23c410ee421c50
Author: Justine Olshan <[email protected]>
AuthorDate: Wed Aug 10 01:25:35 2022 -0700

    KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not 
eligible to join ISR in ZK mode (#12487)
    
    This patch prevents offline or in-controller-shutdown replicas from being 
added back to the ISR and therefore to become leaders in ZK mode. This is an 
extra line of defense to ensure that it never happens. This is a continuation 
of the work done in KIP-841.
    
    Reviewers: David Mao <[email protected]>, Jason Gustafson 
<[email protected]>, Jun Rao <[email protected]>, David Jacot 
<[email protected]>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |   9 +-
 .../scala/kafka/controller/KafkaController.scala   |  18 +++-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  37 +++++--
 .../controller/ControllerIntegrationTest.scala     | 113 ++++++++++++++++-----
 4 files changed, 137 insertions(+), 40 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 538c51f9035..1eab4c4669a 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -26,7 +26,7 @@ import kafka.log._
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server._
 import kafka.server.checkpoints.OffsetCheckpoints
-import kafka.server.metadata.KRaftMetadataCache
+import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
 import kafka.zookeeper.ZooKeeperClientException
@@ -881,11 +881,16 @@ class Partition(val topicPartition: TopicPartition,
   private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
     metadataCache match {
       // In KRaft mode, only replicas which are not fenced nor in controlled 
shutdown are
-      // allowed to join the ISR. This does not apply to ZK mode.
+      // allowed to join the ISR.
       case kRaftMetadataCache: KRaftMetadataCache =>
         !kRaftMetadataCache.isBrokerFenced(followerReplicaId) &&
           !kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId)
 
+      // In ZK mode, we just ensure the broker is alive. Although we do not 
check for shutting down brokers here,
+      // the controller will block them from being added to ISR.
+      case zkMetadataCache: ZkMetadataCache =>
+        zkMetadataCache.hasAliveBroker(followerReplicaId)
+
       case _ => true
     }
   }
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index d179bcd6ca4..0154d9cbe54 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -2364,7 +2364,23 @@ class KafkaController(val config: KafkaConfig,
             )
             None
           } else {
-            Some(tp -> newLeaderAndIsr)
+            // Pull out replicas being added to ISR and verify they are all 
online.
+            // If a replica is not online, reject the update as specified in 
KIP-841.
+            val ineligibleReplicas = newLeaderAndIsr.isr.toSet -- 
controllerContext.liveBrokerIds
+            if (ineligibleReplicas.nonEmpty) {
+              info(s"Rejecting AlterPartition request from node $brokerId for 
$tp because " +
+                s"it specified ineligible replicas $ineligibleReplicas in the 
new ISR ${newLeaderAndIsr.isr}."
+              )
+
+              if (alterPartitionRequestVersion > 1) {
+                partitionResponses(tp) = Left(Errors.INELIGIBLE_REPLICA)
+              } else {
+                partitionResponses(tp) = Left(Errors.OPERATION_NOT_ATTEMPTED)
+              }
+              None
+            } else {
+              Some(tp -> newLeaderAndIsr)
+            }
           }
 
         case None =>
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 50382195794..948abc6c3b8 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -46,7 +46,7 @@ import java.nio.ByteBuffer
 import java.util.Optional
 import java.util.concurrent.{CountDownLatch, Semaphore}
 import kafka.server.epoch.LeaderEpochFileCache
-import kafka.server.metadata.KRaftMetadataCache
+import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
 import org.apache.kafka.clients.ClientResponse
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.replica.ClientMetadata
@@ -55,6 +55,8 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
SecurityProtocol}
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.compat.java8.OptionConverters._
 import scala.jdk.CollectionConverters._
@@ -1375,8 +1377,11 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(alterPartitionListener.failures.get, 1)
   }
 
-  @Test
-  def testIsrNotExpandedIfReplicaIsFenced(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testIsrNotExpandedIfReplicaIsFencedOrShutdown(quorum: String): Unit = {
+    val kraft = quorum == "kraft"
+
     val log = logManager.getOrCreateLog(topicPartition, topicId = None)
     seedLogData(log, numRecords = 10, leaderEpoch = 4)
 
@@ -1386,7 +1391,19 @@ class PartitionTest extends AbstractPartitionTest {
     val replicas = List(brokerId, remoteBrokerId)
     val isr = Set(brokerId)
 
-    val metadataCache = mock(classOf[KRaftMetadataCache])
+    val metadataCache: MetadataCache = if (kraft) 
mock(classOf[KRaftMetadataCache]) else mock(classOf[ZkMetadataCache])
+
+    // Mark the remote broker as eligible or ineligible in the metadata cache 
of the leader.
+    // When using kraft, we can make the broker ineligible by fencing it.
+    // In ZK mode, we must mark the broker as alive for it to be eligible.
+    def markRemoteReplicaEligible(eligible: Boolean): Unit = {
+      if (kraft) {
+        
when(metadataCache.asInstanceOf[KRaftMetadataCache].isBrokerFenced(remoteBrokerId)).thenReturn(!eligible)
+      } else {
+        when(metadataCache.hasAliveBroker(remoteBrokerId)).thenReturn(eligible)
+      }
+    }
+
     val partition = new Partition(
       topicPartition,
       replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
@@ -1414,6 +1431,8 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(isr, partition.partitionState.isr)
     assertEquals(isr, partition.partitionState.maximalIsr)
 
+    markRemoteReplicaEligible(true)
+
     // Fetch to let the follower catch up to the log end offset and
     // to check if an expansion is possible.
     fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 
log.logEndOffset)
@@ -1430,7 +1449,7 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
     assertEquals(1, alterPartitionManager.isrUpdates.size)
 
-    // Controller rejects the expansion because the broker is fenced.
+    // Controller rejects the expansion because the broker is fenced or 
offline.
     alterPartitionManager.failIsrUpdate(Errors.INELIGIBLE_REPLICA)
 
     // The leader reverts back to the previous ISR.
@@ -1439,8 +1458,8 @@ class PartitionTest extends AbstractPartitionTest {
     assertFalse(partition.partitionState.isInflight)
     assertEquals(0, alterPartitionManager.isrUpdates.size)
 
-    // The leader eventually learns about the fenced broker.
-    when(metadataCache.isBrokerFenced(remoteBrokerId)).thenReturn(true)
+    // The leader eventually learns about the fenced or offline broker.
+    markRemoteReplicaEligible(false)
 
     // The follower fetches again.
     fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 
log.logEndOffset)
@@ -1451,8 +1470,8 @@ class PartitionTest extends AbstractPartitionTest {
     assertFalse(partition.partitionState.isInflight)
     assertEquals(0, alterPartitionManager.isrUpdates.size)
 
-    // The broker is eventually unfenced.
-    when(metadataCache.isBrokerFenced(remoteBrokerId)).thenReturn(false)
+    // The broker is eventually unfenced or brought back online.
+    markRemoteReplicaEligible(true)
 
     // The follower fetches again.
     fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 
log.logEndOffset)
diff --git 
a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 76188894371..0c8d000656a 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -26,11 +26,11 @@ import kafka.server.{KafkaConfig, KafkaServer, 
QuorumTestHarness}
 import kafka.utils.{LogCaptureAppender, TestUtils}
 import kafka.zk.{FeatureZNodeStatus, _}
 import org.apache.kafka.common.errors.{ControllerMovedException, 
StaleBrokerEpochException}
-import org.apache.kafka.common.message.AlterPartitionRequestData
-import org.apache.kafka.common.message.AlterPartitionResponseData
+import org.apache.kafka.common.message.{AlterPartitionRequestData, 
AlterPartitionResponseData}
 import org.apache.kafka.common.metrics.KafkaMetric
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
 import org.apache.kafka.common.{ElectionType, TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.MetadataVersion
@@ -40,8 +40,7 @@ import org.apache.log4j.Level
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, 
assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.Arguments
-import org.junit.jupiter.params.provider.MethodSource
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
 import org.mockito.Mockito.{doAnswer, spy, verify}
 import org.mockito.invocation.InvocationOnMock
 
@@ -904,12 +903,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
         ).asJava)
       ).asJava)
 
-    val future = new CompletableFuture[AlterPartitionResponseData]()
-    controller.eventManager.put(AlterPartitionReceived(
-      alterPartitionRequest,
-      alterPartitionVersion,
-      future.complete
-    ))
+    val future = alterPartitionFuture(alterPartitionRequest, 
alterPartitionVersion)
 
     val expectedAlterPartitionResponse = new AlterPartitionResponseData()
       .setTopics(Seq(new AlterPartitionResponseData.TopicData()
@@ -968,12 +962,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
         ).asJava)
       ).asJava)
 
-    val future = new CompletableFuture[AlterPartitionResponseData]()
-    controller.eventManager.put(AlterPartitionReceived(
-      alterPartitionRequest,
-      ApiKeys.ALTER_PARTITION.latestVersion,
-      future.complete
-    ))
+    val future = alterPartitionFuture(alterPartitionRequest, 
ApiKeys.ALTER_PARTITION.latestVersion)
 
     val expectedAlterPartitionResponse = new AlterPartitionResponseData()
       .setTopics(Seq(new AlterPartitionResponseData.TopicData()
@@ -1024,12 +1013,7 @@ class ControllerIntegrationTest extends 
QuorumTestHarness {
           ).asJava)
         ).asJava)
 
-      val future = new CompletableFuture[AlterPartitionResponseData]()
-      controller.eventManager.put(AlterPartitionReceived(
-        alterPartitionRequest,
-        AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION,
-        future.complete
-      ))
+    val future = alterPartitionFuture(alterPartitionRequest, 
AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION)
 
       // When re-sending an ISR update, we should not get and error or any ISR 
changes
       val expectedAlterPartitionResponse = new AlterPartitionResponseData()
@@ -1056,6 +1040,73 @@ class ControllerIntegrationTest extends 
QuorumTestHarness {
     sendAndVerifyAlterPartitionResponse(newPartitionEpoch)
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+  def testShutdownBrokerNotAddedToIsr(alterPartitionVersion: Short): Unit = {
+    servers = makeServers(2)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val otherBroker = servers.find(_.config.brokerId != controllerId).get
+    val brokerId = otherBroker.config.brokerId
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(controllerId, brokerId))
+    val fullIsr = List(controllerId, brokerId)
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = 
assignment, servers = servers)
+
+    // Shut down follower.
+    servers(brokerId).shutdown()
+    servers(brokerId).awaitShutdown()
+
+    val controller = getController().kafkaController
+    val leaderIsrAndControllerEpochMap = 
controller.controllerContext.partitionsLeadershipInfo
+    val leaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+    val topicId = controller.controllerContext.topicIds(tp.topic)
+    val controllerEpoch = 
controller.controllerContext.liveBrokerIdAndEpochs(controllerId)
+
+    // We expect only the controller (online broker) to be in ISR
+    assertEquals(List(controllerId), leaderAndIsr.isr)
+
+    val requestTopic = new AlterPartitionRequestData.TopicData()
+      .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
+        .setPartitionIndex(tp.partition)
+        .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+        .setPartitionEpoch(leaderAndIsr.partitionEpoch)
+        .setNewIsr(fullIsr.map(Int.box).asJava)
+        
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)).asJava)
+    if (alterPartitionVersion > 1) requestTopic.setTopicId(topicId) else 
requestTopic.setTopicName(tp.topic)
+
+    // Try to update ISR to contain the offline broker.
+    val alterPartitionRequest = new AlterPartitionRequestData()
+      .setBrokerId(controllerId)
+      .setBrokerEpoch(controllerEpoch)
+      .setTopics(Seq(requestTopic).asJava)
+
+    val future = alterPartitionFuture(alterPartitionRequest, 
alterPartitionVersion)
+
+    val expectedError = if (alterPartitionVersion > 1) 
Errors.INELIGIBLE_REPLICA else Errors.OPERATION_NOT_ATTEMPTED
+    val expectedResponseTopic = new AlterPartitionResponseData.TopicData()
+      .setPartitions(Seq(new AlterPartitionResponseData.PartitionData()
+        .setPartitionIndex(tp.partition)
+        .setErrorCode(expectedError.code())
+        .setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
+      ).asJava)
+    if (alterPartitionVersion > 1) expectedResponseTopic.setTopicId(topicId) 
else expectedResponseTopic.setTopicName(tp.topic)
+
+    // We expect an ineligble replica error response for the partition.
+    val expectedAlterPartitionResponse = new AlterPartitionResponseData()
+      .setTopics(Seq(expectedResponseTopic).asJava)
+
+    val newLeaderIsrAndControllerEpochMap = 
controller.controllerContext.partitionsLeadershipInfo
+    val newLeaderAndIsr = newLeaderIsrAndControllerEpochMap(tp).leaderAndIsr
+    assertEquals(expectedAlterPartitionResponse, future.get(10, 
TimeUnit.SECONDS))
+    assertEquals(List(controllerId), newLeaderAndIsr.isr)
+
+    // Bring replica back online.
+    servers(brokerId).startup()
+
+    // Wait for broker to rejoin ISR.
+    TestUtils.waitUntilTrue(() => fullIsr == 
zkClient.getTopicPartitionState(tp).get.leaderAndIsr.isr, "Replica did not 
rejoin ISR.")
+  }
+
   @Test
   def testAlterPartitionErrors(): Unit = {
     servers = makeServers(2)
@@ -1338,12 +1389,7 @@ class ControllerIntegrationTest extends 
QuorumTestHarness {
           .setNewIsr(isr.toList.map(Int.box).asJava)
           .setLeaderRecoveryState(leaderRecoveryState)).asJava)).asJava)
 
-    val future = new CompletableFuture[AlterPartitionResponseData]()
-    getController().kafkaController.eventManager.put(AlterPartitionReceived(
-      alterPartitionRequest,
-      if (topicIdOpt.isDefined) 
AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION else 1,
-      future.complete
-    ))
+    val future = alterPartitionFuture(alterPartitionRequest, if 
(topicIdOpt.isDefined) AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION else 
1)
 
     val expectedAlterPartitionResponse = if (topLevelError != Errors.NONE) {
       new AlterPartitionResponseData().setErrorCode(topLevelError.code)
@@ -1818,4 +1864,15 @@ class ControllerIntegrationTest extends 
QuorumTestHarness {
     servers.filter(s => s.config.brokerId == controllerId).head
   }
 
+  private def alterPartitionFuture(alterPartitionRequest: 
AlterPartitionRequestData,
+                                   alterPartitionVersion: Short): 
CompletableFuture[AlterPartitionResponseData] = {
+    val future = new CompletableFuture[AlterPartitionResponseData]()
+    getController().kafkaController.eventManager.put(AlterPartitionReceived(
+      alterPartitionRequest,
+      alterPartitionVersion,
+      future.complete
+    ))
+    future
+  }
+
 }

Reply via email to