This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 168d5c3 KAFKA-13132; Ensure topicId is updated on replicas even when
the leader epoch is unchanged (#11126)
168d5c3 is described below
commit 168d5c3a7cf23dcda0aff99e82bfc07a36494d56
Author: Justine Olshan <[email protected]>
AuthorDate: Thu Jul 29 14:36:44 2021 -0700
KAFKA-13132; Ensure topicId is updated on replicas even when the leader
epoch is unchanged (#11126)
In 3.0, there was a change that resulted in no longer assigning topic IDs
to the log and the partition.metadata file in certain upgrade scenarios,
specifically when upgrading from IBP 2.7 or below to 3.0. In this case, there
may not be a bump to the leader epoch when the topicId is assigned by the
controller, so the LeaderAndIsr request from the controller would be ignored by
the replica. This PR fixes the problem by adding a check for whether we need to
handle the LeaderAndIsr reques [...]
Reviewers: Jason Gustafson <[email protected]>
---
.../main/scala/kafka/server/ReplicaManager.scala | 28 +++++++---
.../controller/ControllerIntegrationTest.scala | 65 +++++++++++++++++++++-
.../unit/kafka/server/ReplicaManagerTest.scala | 57 ++++++++++++++++---
.../test/scala/unit/kafka/utils/TestUtils.scala | 5 +-
4 files changed, 136 insertions(+), 19 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f03571c..84ccd8e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1349,9 +1349,10 @@ class ReplicaManager(val config: KafkaConfig,
val currentLeaderEpoch = partition.getLeaderEpoch
val requestLeaderEpoch = partitionState.leaderEpoch
val requestTopicId = topicIdFromRequest(topicPartition.topic)
+ val logTopicId = partition.topicId
- if (!hasConsistentTopicId(requestTopicId, partition.topicId)) {
- stateChangeLogger.error(s"Topic ID in memory:
${partition.topicId.get} does not" +
+ if (!hasConsistentTopicId(requestTopicId, logTopicId)) {
+ stateChangeLogger.error(s"Topic ID in memory:
${logTopicId.get} does not" +
s" match the topic ID for partition $topicPartition
received: " +
s"${requestTopicId.get}.")
responseMap.put(topicPartition, Errors.INCONSISTENT_TOPIC_ID)
@@ -1374,11 +1375,24 @@ class ReplicaManager(val config: KafkaConfig,
s"leader epoch $currentLeaderEpoch")
responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
} else {
- stateChangeLogger.info(s"Ignoring LeaderAndIsr request from " +
- s"controller $controllerId with correlation id
$correlationId " +
- s"epoch $controllerEpoch for partition $topicPartition since
its associated " +
- s"leader epoch $requestLeaderEpoch matches the current
leader epoch")
- responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
+ val error = requestTopicId match {
+ case Some(topicId) if logTopicId.isEmpty =>
+ // The controller may send LeaderAndIsr to upgrade to
using topic IDs without bumping the epoch.
+ // If we have a matching epoch, we expect the log to be
defined.
+ val log = localLogOrException(partition.topicPartition)
+ log.assignTopicId(topicId)
+ stateChangeLogger.info(s"Updating log for $topicPartition
to assign topic ID " +
+ s"$topicId from LeaderAndIsr request from controller
$controllerId with correlation " +
+ s"id $correlationId epoch $controllerEpoch")
+ Errors.NONE
+ case _ =>
+ stateChangeLogger.info(s"Ignoring LeaderAndIsr request
from " +
+ s"controller $controllerId with correlation id
$correlationId " +
+ s"epoch $controllerEpoch for partition $topicPartition
since its associated " +
+ s"leader epoch $requestLeaderEpoch matches the current
leader epoch")
+ Errors.STALE_CONTROLLER_EPOCH
+ }
+ responseMap.put(topicPartition, error)
}
}
}
diff --git
a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index c0e8611..987ec2d 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -1115,6 +1115,66 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
assertEquals(topicIdAfterUpgrade.get, topicId)
assertEquals("t", controller2.controllerContext.topicNames(topicId))
+ TestUtils.waitUntilTrue(() => servers(0).logManager.getLog(tp).isDefined,
"log was not created")
+
+ val topicIdInLog = servers(0).logManager.getLog(tp).get.topicId
+ assertEquals(Some(topicId), topicIdInLog)
+
+ adminZkClient.deleteTopic(tp.topic)
+ TestUtils.waitUntilTrue(() =>
!servers.head.kafkaController.controllerContext.allTopics.contains(tp.topic),
+ "topic should have been removed from controller context after deletion")
+ }
+
+ @Test
+ def testTopicIdCreatedOnUpgradeMultiBrokerScenario(): Unit = {
+ // Simulate an upgrade scenario where the controller is still on a
pre-topic ID IBP, but the other two brokers are upgraded.
+ servers = makeServers(1, interBrokerProtocolVersion = Some(KAFKA_2_7_IV0))
+ servers = servers ++ makeServers(3, startingIdNumber = 1)
+ val originalControllerId = TestUtils.waitUntilControllerElected(zkClient)
+ assertEquals(0, originalControllerId)
+ val controller = getController().kafkaController
+ assertEquals(KAFKA_2_7_IV0,
servers(originalControllerId).config.interBrokerProtocolVersion)
+ val remainingBrokers = servers.filter(_.config.brokerId !=
originalControllerId)
+ val tp = new TopicPartition("t", 0)
+ // Only the remaining brokers will have the replicas for the partition
+ val assignment = Map(tp.partition ->
remainingBrokers.map(_.config.brokerId))
+ TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment =
assignment, servers = servers)
+ waitForPartitionState(tp, firstControllerEpoch,
remainingBrokers(0).config.brokerId, LeaderAndIsr.initialLeaderEpoch,
+ "failed to get expected partition state upon topic creation")
+ val topicIdAfterCreate =
zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
+ assertEquals(None, topicIdAfterCreate)
+ val emptyTopicId = controller.controllerContext.topicIds.get("t")
+ assertEquals(None, emptyTopicId)
+
+ // All partition logs should not have topic IDs
+ remainingBrokers.foreach { server =>
+ TestUtils.waitUntilTrue(() => server.logManager.getLog(tp).isDefined,
"log was not created for server" + server.config.brokerId)
+ val topicIdInLog = server.logManager.getLog(tp).get.topicId
+ assertEquals(None, topicIdInLog)
+ }
+
+ // Shut down the controller to transfer the controller to a new IBP broker.
+ servers(originalControllerId).shutdown()
+ servers(originalControllerId).awaitShutdown()
+ // If we were upgrading, this server would be the latest IBP, but it
doesn't matter in this test scenario
+ servers(originalControllerId).startup()
+ TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed
to elect a controller")
+ val topicIdAfterUpgrade =
zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
+ assertNotEquals(emptyTopicId, topicIdAfterUpgrade)
+ val controller2 = getController().kafkaController
+ assertNotEquals(emptyTopicId,
controller2.controllerContext.topicIds.get("t"))
+ val topicId = controller2.controllerContext.topicIds.get("t").get
+ assertEquals(topicIdAfterUpgrade.get, topicId)
+ assertEquals("t", controller2.controllerContext.topicNames(topicId))
+
+ // All partition logs should have topic IDs
+ remainingBrokers.foreach { server =>
+ TestUtils.waitUntilTrue(() => server.logManager.getLog(tp).isDefined,
"log was not created for server" + server.config.brokerId)
+ val topicIdInLog = server.logManager.getLog(tp).get.topicId
+ assertEquals(Some(topicId), topicIdInLog,
+ s"Server ${server.config.brokerId} had topic ID $topicIdInLog instead
of ${Some(topicId)} as expected.")
+ }
+
adminZkClient.deleteTopic(tp.topic)
TestUtils.waitUntilTrue(() =>
!servers.head.kafkaController.controllerContext.allTopics.contains(tp.topic),
"topic should have been removed from controller context after deletion")
@@ -1287,8 +1347,9 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
listenerSecurityProtocolMap : Option[String] = None,
controlPlaneListenerName : Option[String] = None,
interBrokerProtocolVersion: Option[ApiVersion] =
None,
- logDirCount: Int = 1) = {
- val configs = TestUtils.createBrokerConfigs(numConfigs, zkConnect,
enableControlledShutdown = enableControlledShutdown, logDirCount = logDirCount)
+ logDirCount: Int = 1,
+ startingIdNumber: Int = 0) = {
+ val configs = TestUtils.createBrokerConfigs(numConfigs, zkConnect,
enableControlledShutdown = enableControlledShutdown, logDirCount = logDirCount,
startingIdNumber = startingIdNumber )
configs.foreach { config =>
config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp,
autoLeaderRebalanceEnable.toString)
config.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp,
uncleanLeaderElectionEnable.toString)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index fd6ba75..f92b2d2 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -252,7 +252,6 @@ class ReplicaManagerTest {
replicaManager.createPartition(topicPartition)
.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints),
None)
- val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
@@ -265,7 +264,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(true)).asJava,
- topicIds,
+ Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _)
=> ())
@@ -1228,7 +1227,6 @@ class ReplicaManagerTest {
val offsetCheckpoints = new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false,
isFutureReplica = false, offsetCheckpoints, None)
val partition0Replicas = Seq[Integer](0, 1).asJava
- val topicIds = Collections.singletonMap(tp0.topic, Uuid.randomUuid())
val becomeLeaderRequest = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
@@ -1241,7 +1239,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
- topicIds,
+ Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
@@ -1262,7 +1260,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
- topicIds,
+ Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) =>
())
@@ -1279,7 +1277,6 @@ class ReplicaManagerTest {
val offsetCheckpoints = new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false,
isFutureReplica = false, offsetCheckpoints, None)
val partition0Replicas = Seq[Integer](0, 1).asJava
- val topicIds = Collections.singletonMap(tp0.topic, Uuid.randomUuid())
val becomeLeaderRequest = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
@@ -1292,7 +1289,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
- topicIds,
+ Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
@@ -1314,7 +1311,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
- topicIds,
+ Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) =>
())
@@ -2357,6 +2354,50 @@ class ReplicaManagerTest {
}
@Test
+ def testPartitionMetadataFileCreatedAfterPreviousRequestWithoutIds(): Unit =
{
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time))
+ try {
+ val brokerList = Seq[Integer](0, 1).asJava
+ val topicPartition = new TopicPartition(topic, 0)
+ val topicId = Uuid.randomUuid()
+ val topicIds = Collections.singletonMap(topic, topicId)
+ val topicNames = Collections.singletonMap(topicId, topic)
+
+ def leaderAndIsrRequest(topicIds: util.Map[String, Uuid], version:
Short): LeaderAndIsrRequest =
+ new LeaderAndIsrRequest.Builder(version, 0, 0, brokerEpoch,
+ Seq(new LeaderAndIsrPartitionState()
+ .setTopicName(topic)
+ .setPartitionIndex(0)
+ .setControllerEpoch(0)
+ .setLeader(0)
+ .setLeaderEpoch(0)
+ .setIsr(brokerList)
+ .setZkVersion(0)
+ .setReplicas(brokerList)
+ .setIsNew(true)).asJava,
+ topicIds,
+ Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+
+ // Send a request without a topic ID so that we have a log without a
topic ID associated to the partition.
+ val response = replicaManager.becomeLeaderOrFollower(0,
leaderAndIsrRequest(Collections.emptyMap(), 4), (_, _) => ())
+ assertEquals(Errors.NONE,
response.partitionErrors(Collections.emptyMap()).get(topicPartition))
+ assertTrue(replicaManager.localLog(topicPartition).isDefined)
+ val log = replicaManager.localLog(topicPartition).get
+ assertFalse(log.partitionMetadataFile.exists())
+ assertTrue(log.topicId.isEmpty)
+
+ val response2 = replicaManager.becomeLeaderOrFollower(0,
leaderAndIsrRequest(topicIds, ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) =>
())
+ assertEquals(Errors.NONE,
response2.partitionErrors(topicNames).get(topicPartition))
+ assertTrue(replicaManager.localLog(topicPartition).isDefined)
+ assertTrue(log.partitionMetadataFile.exists())
+ assertTrue(log.topicId.isDefined)
+ assertEquals(topicId, log.topicId.get)
+
+ assertEquals(topicId, log.partitionMetadataFile.read().topicId)
+ } finally replicaManager.shutdown(checkpointHW = false)
+ }
+
+ @Test
def testInconsistentIdReturnsError(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time))
try {
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 9ff2f3a..bd9c02c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -204,8 +204,9 @@ object TestUtils extends Logging {
logDirCount: Int = 1,
enableToken: Boolean = false,
numPartitions: Int = 1,
- defaultReplicationFactor: Short = 1): Seq[Properties] = {
- (0 until numConfigs).map { node =>
+ defaultReplicationFactor: Short = 1,
+ startingIdNumber: Int = 0): Seq[Properties] = {
+ (startingIdNumber until numConfigs).map { node =>
createBrokerConfig(node, zkConnect, enableControlledShutdown,
enableDeleteTopic, RandomPort,
interBrokerSecurityProtocol, trustStoreFile, saslProperties,
enablePlaintext = enablePlaintext, enableSsl = enableSsl,
enableSaslPlaintext = enableSaslPlaintext, enableSaslSsl =
enableSaslSsl, rack = rackInfo.get(node), logDirCount = logDirCount,
enableToken = enableToken,