jsancio commented on code in PR #12181:
URL: https://github.com/apache/kafka/pull/12181#discussion_r894069336
##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -159,48 +158,63 @@ sealed trait PartitionState {
}
sealed trait PendingPartitionChange extends PartitionState {
+ def lastCommittedState: CommittedPartitionState
def sentLeaderAndIsr: LeaderAndIsr
override val leaderRecoveryState: LeaderRecoveryState =
LeaderRecoveryState.RECOVERED
+
+ def notifyListener(alterPartitionListener: AlterPartitionListener): Unit
}
case class PendingExpandIsr(
- isr: Set[Int],
newInSyncReplicaId: Int,
- sentLeaderAndIsr: LeaderAndIsr
+ sentLeaderAndIsr: LeaderAndIsr,
+ lastCommittedState: CommittedPartitionState
) extends PendingPartitionChange {
+ val isr = lastCommittedState.isr
val maximalIsr = isr + newInSyncReplicaId
val isInflight = true
+ def notifyListener(alterPartitionListener: AlterPartitionListener): Unit = {
+ alterPartitionListener.markIsrExpand()
+ }
+
override def toString: String = {
s"PendingExpandIsr(isr=$isr" +
Review Comment:
We can remove printing the `isr` since it is duplicated in
`lastComittedState`. Same for `PendingShrinkIsr`.
##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -124,7 +125,20 @@ class DefaultAlterPartitionManager(
val metadataVersionSupplier: () => MetadataVersion
) extends AlterPartitionManager with Logging with KafkaMetricsGroup {
- // Used to allow only one pending ISR update per partition (visible for
testing)
+ // Used to allow only one pending ISR update per partition (visible for
testing).
+ // Note that we key items by TopicPartition despite using TopicIdPartition
while
+ // submitting changes. We do this to ensure that topics with the same name
but
+ // with a different topic id or no topic id collide here. There are two
cases to
+ // consider:
+ // 1) When the cluster is upgraded from IBP < 2.8 to IBP >= 2.8, the ZK
controller
+ // assigns topic ids to the partitions. So partitions will start sending
updates
+ // with a topic id while they might still have updates without topic ids
in this
+ // Map. This would break the contract of only allowing one pending ISR
update per
+ // partition.
+ // 2) When a topic is deleted and re-created, we cannot have two entries in
this Map
+ // especially if we cannot use an AlterPartition request version which
supports
+ // topic ids in the end because the two updates with the same name would
be merged
+ // together.
Review Comment:
Okay. I think in most case this would fail because the partition epoch
doesn't equal. It would only succeed if the controller created the new topic
partition with the same assignment and leader, and the partition epoch was
never increased for the old partition.
##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -2225,194 +2223,225 @@ class KafkaController(val config: KafkaConfig,
}
}
- def alterPartitions(alterPartitionRequest: AlterPartitionRequestData,
callback: AlterPartitionResponseData => Unit): Unit = {
- val partitionsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
-
- alterPartitionRequest.topics.forEach { topicReq =>
- topicReq.partitions.forEach { partitionReq =>
- partitionsToAlter.put(
- new TopicPartition(topicReq.name, partitionReq.partitionIndex),
- LeaderAndIsr(
- alterPartitionRequest.brokerId,
- partitionReq.leaderEpoch,
- partitionReq.newIsr().asScala.toList.map(_.toInt),
- LeaderRecoveryState.of(partitionReq.leaderRecoveryState),
- partitionReq.partitionEpoch
- )
- )
- }
- }
+ def alterPartitions(
+ alterPartitionRequest: AlterPartitionRequestData,
+ alterPartitionRequestVersion: Short,
+ callback: AlterPartitionResponseData => Unit
+ ): Unit = {
+ eventManager.put(AlterPartitionReceived(
+ alterPartitionRequest,
+ alterPartitionRequestVersion,
+ callback
+ ))
+ }
- def responseCallback(results: Either[Map[TopicPartition, Either[Errors,
LeaderAndIsr]], Errors]): Unit = {
- val resp = new AlterPartitionResponseData()
- results match {
- case Right(error) =>
- resp.setErrorCode(error.code)
- case Left(partitionResults) =>
- resp.setTopics(new util.ArrayList())
- partitionResults
- .groupBy { case (tp, _) => tp.topic } // Group by topic
- .foreach { case (topic, partitions) =>
- // Add each topic part to the response
- val topicResp = new AlterPartitionResponseData.TopicData()
- .setName(topic)
- .setPartitions(new util.ArrayList())
- resp.topics.add(topicResp)
- partitions.foreach { case (tp, errorOrIsr) =>
- // Add each partition part to the response (new ISR or error)
- errorOrIsr match {
- case Left(error) => topicResp.partitions.add(
- new AlterPartitionResponseData.PartitionData()
- .setPartitionIndex(tp.partition)
- .setErrorCode(error.code))
- case Right(leaderAndIsr) =>
- /* Setting the LeaderRecoveryState field is always safe
because it will always be the same
- * as the value set in the request. For version 0, that is
always the default RECOVERED
- * which is ignored when serializing to version 0. For any
other version, the
- * LeaderRecoveryState field is supported.
- */
- topicResp.partitions.add(
- new AlterPartitionResponseData.PartitionData()
- .setPartitionIndex(tp.partition)
- .setLeaderId(leaderAndIsr.leader)
- .setLeaderEpoch(leaderAndIsr.leaderEpoch)
- .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
-
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
- .setPartitionEpoch(leaderAndIsr.partitionEpoch)
- )
- }
- }
- }
- }
- callback.apply(resp)
+ private def processAlterPartition(
+ alterPartitionRequest: AlterPartitionRequestData,
+ alterPartitionRequestVersion: Short,
+ callback: AlterPartitionResponseData => Unit
+ ): Unit = {
+ try {
+ doProcessAlterPartition(
+ alterPartitionRequest,
+ alterPartitionRequestVersion,
+ callback
+ )
+ } catch {
+ case e: Throwable =>
+ error(s"Error when processing AlterPartition: $alterPartitionRequest",
e)
+ callback(new
AlterPartitionResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code))
}
-
- eventManager.put(
- AlterPartitionReceived(alterPartitionRequest.brokerId,
alterPartitionRequest.brokerEpoch, partitionsToAlter, responseCallback)
- )
}
- private def processAlterPartition(
- brokerId: Int,
- brokerEpoch: Long,
- partitionsToAlter: Map[TopicPartition, LeaderAndIsr],
- callback: AlterPartitionCallback
+ private def doProcessAlterPartition(
+ alterPartitionRequest: AlterPartitionRequestData,
+ alterPartitionRequestVersion: Short,
+ callback: AlterPartitionResponseData => Unit
): Unit = {
+ val useTopicsIds = alterPartitionRequestVersion > 1
+
+ if (useTopicsIds && !config.usesTopicId) {
+ callback(new
AlterPartitionResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code))
Review Comment:
Don't we need to allow this? For example,
1. All of the nodes are running software version 2.8 or greater.
2. Changed the IBP of the controller to 2.8, all of the topics are assigned
a topic id.
3. The controller fails to a broker that is on IBP 2.7 or less.
Do we need a test that verifies this case?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]