This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 163d00b3e61 KAFKA-14140: Ensure an offline or in-controlled-shutdown
replica is not eligible to join ISR in ZK mode (#12487)
163d00b3e61 is described below
commit 163d00b3e61ac63cb0b65a4d9b23c410ee421c50
Author: Justine Olshan <[email protected]>
AuthorDate: Wed Aug 10 01:25:35 2022 -0700
KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not
eligible to join ISR in ZK mode (#12487)
This patch prevents offline or in-controller-shutdown replicas from being
added back to the ISR and therefore to become leaders in ZK mode. This is an
extra line of defense to ensure that it never happens. This is a continuation
of the work done in KIP-841.
Reviewers: David Mao <[email protected]>, Jason Gustafson
<[email protected]>, Jun Rao <[email protected]>, David Jacot
<[email protected]>
---
core/src/main/scala/kafka/cluster/Partition.scala | 9 +-
.../scala/kafka/controller/KafkaController.scala | 18 +++-
.../scala/unit/kafka/cluster/PartitionTest.scala | 37 +++++--
.../controller/ControllerIntegrationTest.scala | 113 ++++++++++++++++-----
4 files changed, 137 insertions(+), 40 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index 538c51f9035..1eab4c4669a 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -26,7 +26,7 @@ import kafka.log._
import kafka.metrics.KafkaMetricsGroup
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints
-import kafka.server.metadata.KRaftMetadataCache
+import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
import kafka.zookeeper.ZooKeeperClientException
@@ -881,11 +881,16 @@ class Partition(val topicPartition: TopicPartition,
private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
metadataCache match {
// In KRaft mode, only replicas which are not fenced nor in controlled
shutdown are
- // allowed to join the ISR. This does not apply to ZK mode.
+ // allowed to join the ISR.
case kRaftMetadataCache: KRaftMetadataCache =>
!kRaftMetadataCache.isBrokerFenced(followerReplicaId) &&
!kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId)
+ // In ZK mode, we just ensure the broker is alive. Although we do not
check for shutting down brokers here,
+ // the controller will block them from being added to ISR.
+ case zkMetadataCache: ZkMetadataCache =>
+ zkMetadataCache.hasAliveBroker(followerReplicaId)
+
case _ => true
}
}
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
b/core/src/main/scala/kafka/controller/KafkaController.scala
index d179bcd6ca4..0154d9cbe54 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -2364,7 +2364,23 @@ class KafkaController(val config: KafkaConfig,
)
None
} else {
- Some(tp -> newLeaderAndIsr)
+ // Pull out replicas being added to ISR and verify they are all
online.
+ // If a replica is not online, reject the update as specified in
KIP-841.
+ val ineligibleReplicas = newLeaderAndIsr.isr.toSet --
controllerContext.liveBrokerIds
+ if (ineligibleReplicas.nonEmpty) {
+ info(s"Rejecting AlterPartition request from node $brokerId for
$tp because " +
+ s"it specified ineligible replicas $ineligibleReplicas in the
new ISR ${newLeaderAndIsr.isr}."
+ )
+
+ if (alterPartitionRequestVersion > 1) {
+ partitionResponses(tp) = Left(Errors.INELIGIBLE_REPLICA)
+ } else {
+ partitionResponses(tp) = Left(Errors.OPERATION_NOT_ATTEMPTED)
+ }
+ None
+ } else {
+ Some(tp -> newLeaderAndIsr)
+ }
}
case None =>
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 50382195794..948abc6c3b8 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -46,7 +46,7 @@ import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.{CountDownLatch, Semaphore}
import kafka.server.epoch.LeaderEpochFileCache
-import kafka.server.metadata.KRaftMetadataCache
+import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.replica.ClientMetadata
@@ -55,6 +55,8 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal,
SecurityProtocol}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.metrics.KafkaYammerMetrics
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
@@ -1375,8 +1377,11 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(alterPartitionListener.failures.get, 1)
}
- @Test
- def testIsrNotExpandedIfReplicaIsFenced(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testIsrNotExpandedIfReplicaIsFencedOrShutdown(quorum: String): Unit = {
+ val kraft = quorum == "kraft"
+
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
@@ -1386,7 +1391,19 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List(brokerId, remoteBrokerId)
val isr = Set(brokerId)
- val metadataCache = mock(classOf[KRaftMetadataCache])
+ val metadataCache: MetadataCache = if (kraft)
mock(classOf[KRaftMetadataCache]) else mock(classOf[ZkMetadataCache])
+
+ // Mark the remote broker as eligible or ineligible in the metadata cache
of the leader.
+ // When using kraft, we can make the broker ineligible by fencing it.
+ // In ZK mode, we must mark the broker as alive for it to be eligible.
+ def markRemoteReplicaEligible(eligible: Boolean): Unit = {
+ if (kraft) {
+
when(metadataCache.asInstanceOf[KRaftMetadataCache].isBrokerFenced(remoteBrokerId)).thenReturn(!eligible)
+ } else {
+ when(metadataCache.hasAliveBroker(remoteBrokerId)).thenReturn(eligible)
+ }
+ }
+
val partition = new Partition(
topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
@@ -1414,6 +1431,8 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(isr, partition.partitionState.isr)
assertEquals(isr, partition.partitionState.maximalIsr)
+ markRemoteReplicaEligible(true)
+
// Fetch to let the follower catch up to the log end offset and
// to check if an expansion is possible.
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset =
log.logEndOffset)
@@ -1430,7 +1449,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
assertEquals(1, alterPartitionManager.isrUpdates.size)
- // Controller rejects the expansion because the broker is fenced.
+ // Controller rejects the expansion because the broker is fenced or
offline.
alterPartitionManager.failIsrUpdate(Errors.INELIGIBLE_REPLICA)
// The leader reverts back to the previous ISR.
@@ -1439,8 +1458,8 @@ class PartitionTest extends AbstractPartitionTest {
assertFalse(partition.partitionState.isInflight)
assertEquals(0, alterPartitionManager.isrUpdates.size)
- // The leader eventually learns about the fenced broker.
- when(metadataCache.isBrokerFenced(remoteBrokerId)).thenReturn(true)
+ // The leader eventually learns about the fenced or offline broker.
+ markRemoteReplicaEligible(false)
// The follower fetches again.
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset =
log.logEndOffset)
@@ -1451,8 +1470,8 @@ class PartitionTest extends AbstractPartitionTest {
assertFalse(partition.partitionState.isInflight)
assertEquals(0, alterPartitionManager.isrUpdates.size)
- // The broker is eventually unfenced.
- when(metadataCache.isBrokerFenced(remoteBrokerId)).thenReturn(false)
+ // The broker is eventually unfenced or brought back online.
+ markRemoteReplicaEligible(true)
// The follower fetches again.
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset =
log.logEndOffset)
diff --git
a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 76188894371..0c8d000656a 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -26,11 +26,11 @@ import kafka.server.{KafkaConfig, KafkaServer,
QuorumTestHarness}
import kafka.utils.{LogCaptureAppender, TestUtils}
import kafka.zk.{FeatureZNodeStatus, _}
import org.apache.kafka.common.errors.{ControllerMovedException,
StaleBrokerEpochException}
-import org.apache.kafka.common.message.AlterPartitionRequestData
-import org.apache.kafka.common.message.AlterPartitionResponseData
+import org.apache.kafka.common.message.{AlterPartitionRequestData,
AlterPartitionResponseData}
import org.apache.kafka.common.metrics.KafkaMetric
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.{ElectionType, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.MetadataVersion
@@ -40,8 +40,7 @@ import org.apache.log4j.Level
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals,
assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.Arguments
-import org.junit.jupiter.params.provider.MethodSource
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
import org.mockito.Mockito.{doAnswer, spy, verify}
import org.mockito.invocation.InvocationOnMock
@@ -904,12 +903,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
).asJava)
).asJava)
- val future = new CompletableFuture[AlterPartitionResponseData]()
- controller.eventManager.put(AlterPartitionReceived(
- alterPartitionRequest,
- alterPartitionVersion,
- future.complete
- ))
+ val future = alterPartitionFuture(alterPartitionRequest,
alterPartitionVersion)
val expectedAlterPartitionResponse = new AlterPartitionResponseData()
.setTopics(Seq(new AlterPartitionResponseData.TopicData()
@@ -968,12 +962,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
).asJava)
).asJava)
- val future = new CompletableFuture[AlterPartitionResponseData]()
- controller.eventManager.put(AlterPartitionReceived(
- alterPartitionRequest,
- ApiKeys.ALTER_PARTITION.latestVersion,
- future.complete
- ))
+ val future = alterPartitionFuture(alterPartitionRequest,
ApiKeys.ALTER_PARTITION.latestVersion)
val expectedAlterPartitionResponse = new AlterPartitionResponseData()
.setTopics(Seq(new AlterPartitionResponseData.TopicData()
@@ -1024,12 +1013,7 @@ class ControllerIntegrationTest extends
QuorumTestHarness {
).asJava)
).asJava)
- val future = new CompletableFuture[AlterPartitionResponseData]()
- controller.eventManager.put(AlterPartitionReceived(
- alterPartitionRequest,
- AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION,
- future.complete
- ))
+ val future = alterPartitionFuture(alterPartitionRequest,
AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION)
// When re-sending an ISR update, we should not get and error or any ISR
changes
val expectedAlterPartitionResponse = new AlterPartitionResponseData()
@@ -1056,6 +1040,73 @@ class ControllerIntegrationTest extends
QuorumTestHarness {
sendAndVerifyAlterPartitionResponse(newPartitionEpoch)
}
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+ def testShutdownBrokerNotAddedToIsr(alterPartitionVersion: Short): Unit = {
+ servers = makeServers(2)
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+ val otherBroker = servers.find(_.config.brokerId != controllerId).get
+ val brokerId = otherBroker.config.brokerId
+ val tp = new TopicPartition("t", 0)
+ val assignment = Map(tp.partition -> Seq(controllerId, brokerId))
+ val fullIsr = List(controllerId, brokerId)
+ TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment =
assignment, servers = servers)
+
+ // Shut down follower.
+ servers(brokerId).shutdown()
+ servers(brokerId).awaitShutdown()
+
+ val controller = getController().kafkaController
+ val leaderIsrAndControllerEpochMap =
controller.controllerContext.partitionsLeadershipInfo
+ val leaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+ val topicId = controller.controllerContext.topicIds(tp.topic)
+ val controllerEpoch =
controller.controllerContext.liveBrokerIdAndEpochs(controllerId)
+
+ // We expect only the controller (online broker) to be in ISR
+ assertEquals(List(controllerId), leaderAndIsr.isr)
+
+ val requestTopic = new AlterPartitionRequestData.TopicData()
+ .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
+ .setPartitionIndex(tp.partition)
+ .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+ .setPartitionEpoch(leaderAndIsr.partitionEpoch)
+ .setNewIsr(fullIsr.map(Int.box).asJava)
+
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)).asJava)
+ if (alterPartitionVersion > 1) requestTopic.setTopicId(topicId) else
requestTopic.setTopicName(tp.topic)
+
+ // Try to update ISR to contain the offline broker.
+ val alterPartitionRequest = new AlterPartitionRequestData()
+ .setBrokerId(controllerId)
+ .setBrokerEpoch(controllerEpoch)
+ .setTopics(Seq(requestTopic).asJava)
+
+ val future = alterPartitionFuture(alterPartitionRequest,
alterPartitionVersion)
+
+ val expectedError = if (alterPartitionVersion > 1)
Errors.INELIGIBLE_REPLICA else Errors.OPERATION_NOT_ATTEMPTED
+ val expectedResponseTopic = new AlterPartitionResponseData.TopicData()
+ .setPartitions(Seq(new AlterPartitionResponseData.PartitionData()
+ .setPartitionIndex(tp.partition)
+ .setErrorCode(expectedError.code())
+ .setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
+ ).asJava)
+ if (alterPartitionVersion > 1) expectedResponseTopic.setTopicId(topicId)
else expectedResponseTopic.setTopicName(tp.topic)
+
+ // We expect an ineligble replica error response for the partition.
+ val expectedAlterPartitionResponse = new AlterPartitionResponseData()
+ .setTopics(Seq(expectedResponseTopic).asJava)
+
+ val newLeaderIsrAndControllerEpochMap =
controller.controllerContext.partitionsLeadershipInfo
+ val newLeaderAndIsr = newLeaderIsrAndControllerEpochMap(tp).leaderAndIsr
+ assertEquals(expectedAlterPartitionResponse, future.get(10,
TimeUnit.SECONDS))
+ assertEquals(List(controllerId), newLeaderAndIsr.isr)
+
+ // Bring replica back online.
+ servers(brokerId).startup()
+
+ // Wait for broker to rejoin ISR.
+ TestUtils.waitUntilTrue(() => fullIsr ==
zkClient.getTopicPartitionState(tp).get.leaderAndIsr.isr, "Replica did not
rejoin ISR.")
+ }
+
@Test
def testAlterPartitionErrors(): Unit = {
servers = makeServers(2)
@@ -1338,12 +1389,7 @@ class ControllerIntegrationTest extends
QuorumTestHarness {
.setNewIsr(isr.toList.map(Int.box).asJava)
.setLeaderRecoveryState(leaderRecoveryState)).asJava)).asJava)
- val future = new CompletableFuture[AlterPartitionResponseData]()
- getController().kafkaController.eventManager.put(AlterPartitionReceived(
- alterPartitionRequest,
- if (topicIdOpt.isDefined)
AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION else 1,
- future.complete
- ))
+ val future = alterPartitionFuture(alterPartitionRequest, if
(topicIdOpt.isDefined) AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION else
1)
val expectedAlterPartitionResponse = if (topLevelError != Errors.NONE) {
new AlterPartitionResponseData().setErrorCode(topLevelError.code)
@@ -1818,4 +1864,15 @@ class ControllerIntegrationTest extends
QuorumTestHarness {
servers.filter(s => s.config.brokerId == controllerId).head
}
+ private def alterPartitionFuture(alterPartitionRequest:
AlterPartitionRequestData,
+ alterPartitionVersion: Short):
CompletableFuture[AlterPartitionResponseData] = {
+ val future = new CompletableFuture[AlterPartitionResponseData]()
+ getController().kafkaController.eventManager.put(AlterPartitionReceived(
+ alterPartitionRequest,
+ alterPartitionVersion,
+ future.complete
+ ))
+ future
+ }
+
}