This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push: new a4f1aa2e630 KAFKA-16297: Race condition while promoting future replica (#15557) a4f1aa2e630 is described below commit a4f1aa2e6306951f84f03819bbd8c135f480248c Author: Igor Soarez <soa...@apple.com> AuthorDate: Wed Apr 10 10:57:05 2024 +0100 KAFKA-16297: Race condition while promoting future replica (#15557) If a future replica doesn't get promoted, any directory reassignment sent to the controller should be reversed. The current logic is already addressing the case when a replica hasn't yet been promoted and the controller hasn't yet acknowledged the directory reassignment. However, it doesn't cover the case where the replica does not get promoted due to a directory failure after the controller has acknowledged the reassignment but before the future replica catches up again and is promoted to main replica. Reviewers: Luke Chen <show...@gmail.com> --- .../kafka/server/ReplicaAlterLogDirsThread.scala | 119 +++++++++++++++------ .../server/ReplicaAlterLogDirsThreadTest.scala | 64 +++++++++-- .../controller/ReplicationControlManagerTest.java | 20 +++- 3 files changed, 159 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index b49144f840b..33e6afe5cc1 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -18,8 +18,8 @@ package kafka.server import kafka.cluster.Partition -import kafka.server.ReplicaAlterLogDirsThread.{DirectoryEventRequestState, QUEUED} -import org.apache.kafka.common.TopicPartition +import kafka.server.ReplicaAlterLogDirsThread.{PromotionState, ReassignmentState} +import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.common.requests.FetchResponse import org.apache.kafka.server.common.{DirectoryEventHandler, OffsetAndEpoch, TopicIdPartition} import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogStartOffsetIncrementReason} @@ -45,7 +45,8 @@ class ReplicaAlterLogDirsThread(name: String, isInterruptible = false, brokerTopicStats) { - private val assignmentRequestStates: ConcurrentHashMap[TopicPartition, DirectoryEventRequestState] = new ConcurrentHashMap() + // Visible for testing + private[server] val promotionStates: ConcurrentHashMap[TopicPartition, PromotionState] = new ConcurrentHashMap() override protected def latestEpoch(topicPartition: TopicPartition): Option[Int] = { replicaMgr.futureLocalLogOrException(topicPartition).latestEpoch @@ -96,23 +97,26 @@ class ReplicaAlterLogDirsThread(name: String, } override def removePartitions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, PartitionFetchState] = { - // Schedule assignment request to revert any queued request before cancelling - for { - topicPartition <- topicPartitions - partitionState <- partitionAssignmentRequestState(topicPartition) - if partitionState == QUEUED - partition = replicaMgr.getPartitionOrException(topicPartition) - topicId <- partition.topicId - directoryId <- partition.logDirectoryId() - topicIdPartition = new TopicIdPartition(topicId, topicPartition.partition()) - } directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () => ()) + for (topicPartition <- topicPartitions) { + if (this.promotionStates.containsKey(topicPartition)) { + val PromotionState(reassignmentState, topicId, originalDir) = this.promotionStates.get(topicPartition) + // Revert any reassignments for partitions that did not complete the future replica promotion + if (originalDir.isDefined && topicId.isDefined && reassignmentState.maybeInconsistentMetadata) { + directoryEventHandler.handleAssignment(new TopicIdPartition(topicId.get, topicPartition.partition()), originalDir.get, () => ()) + } + this.promotionStates.remove(topicPartition) + } + } super.removePartitions(topicPartitions) } + private def reassignmentState(topicPartition: TopicPartition): ReassignmentState = promotionStates.get(topicPartition).reassignmentState + // Visible for testing - private[server] def updatedAssignmentRequestState(topicPartition: TopicPartition)(state: ReplicaAlterLogDirsThread.DirectoryEventRequestState): Unit = { - assignmentRequestStates.put(topicPartition, state) + private[server] def updateReassignmentState(topicPartition: TopicPartition, state: ReassignmentState): Unit = { + log.debug(s"Updating future replica ${topicPartition} reassignment state to ${state}") + promotionStates.put(topicPartition, promotionStates.get(topicPartition).withAssignment(state)) } private def maybePromoteFutureReplica(topicPartition: TopicPartition, partition: Partition) = { @@ -120,33 +124,28 @@ class ReplicaAlterLogDirsThread(name: String, if (topicId.isEmpty) throw new IllegalStateException(s"Topic ${topicPartition.topic()} does not have an ID.") - partitionAssignmentRequestState(topicPartition) match { - case None => + reassignmentState(topicPartition) match { + case ReassignmentState.None => // Schedule assignment request and don't promote the future replica yet until the controller has accepted the request. partition.runCallbackIfFutureReplicaCaughtUp(_ => { - partition.futureReplicaDirectoryId() - .map(id => { - directoryEventHandler.handleAssignment(new TopicIdPartition(topicId.get, topicPartition.partition()), id, - () => updatedAssignmentRequestState(topicPartition)(ReplicaAlterLogDirsThread.COMPLETED)) - // mark the assignment request state as queued. - updatedAssignmentRequestState(topicPartition)(ReplicaAlterLogDirsThread.QUEUED) - }) + val targetDir = partition.futureReplicaDirectoryId().get + val topicIdPartition = new TopicIdPartition(topicId.get, topicPartition.partition()) + directoryEventHandler.handleAssignment(topicIdPartition, targetDir, () => updateReassignmentState(topicPartition, ReassignmentState.Accepted)) + updateReassignmentState(topicPartition, ReassignmentState.Queued) }) - case Some(ReplicaAlterLogDirsThread.COMPLETED) => + case ReassignmentState.Accepted => // Promote future replica if controller accepted the request and the replica caught-up with the original log. if (partition.maybeReplaceCurrentWithFutureReplica()) { + updateReassignmentState(topicPartition, ReassignmentState.Effective) removePartitions(Set(topicPartition)) - assignmentRequestStates.remove(topicPartition) } - case _ => - log.trace("Waiting for AssignmentRequest to succeed before promoting the future replica.") + case ReassignmentState.Queued => + log.trace("Waiting for AssignReplicasToDirsRequest to succeed before promoting the future replica.") + case ReassignmentState.Effective => + throw new IllegalStateException("BUG: trying to promote a future replica twice") } } - private def partitionAssignmentRequestState(topicPartition: TopicPartition): Option[DirectoryEventRequestState] = { - Option(assignmentRequestStates.get(topicPartition)) - } - override def addPartitions(initialFetchStates: Map[TopicPartition, InitialFetchState]): Set[TopicPartition] = { partitionMapLock.lockInterruptibly() try { @@ -155,6 +154,13 @@ class ReplicaAlterLogDirsThread(name: String, val filteredFetchStates = initialFetchStates.filter { case (tp, _) => replicaMgr.futureLogExists(tp) } + filteredFetchStates.foreach { + case (topicPartition, state) => + val topicId = state.topicId + val currentDirectoryId = replicaMgr.getPartitionOrException(topicPartition).logDirectoryId() + val promotionState = PromotionState(ReassignmentState.None, topicId, currentDirectoryId) + promotionStates.put(topicPartition, promotionState) + } super.addPartitions(filteredFetchStates) } finally { partitionMapLock.unlock() @@ -188,9 +194,52 @@ class ReplicaAlterLogDirsThread(name: String, } } object ReplicaAlterLogDirsThread { - sealed trait DirectoryEventRequestState + /** + * @param reassignmentState Tracks the state of the replica-to-directory assignment update in the metadata + * @param topicId The ID of the topic, which is useful if a reverting the assignment is required + * @param currentDir The original directory ID from which the future replica fetches from + */ + case class PromotionState(reassignmentState: ReassignmentState, topicId: Option[Uuid], currentDir: Option[Uuid]) { + def withAssignment(newDirReassignmentState: ReassignmentState): PromotionState = + PromotionState(newDirReassignmentState, topicId, currentDir) + } + + /** + * Represents the state of the request to update the directory assignment from the current replica directory + * to the future replica directory. + */ + sealed trait ReassignmentState { + /** + * @return true if the directory assignment in the cluster metadata may be inconsistent with the actual + * directory where the main replica is hosted. + */ + def maybeInconsistentMetadata: Boolean = false + } + + object ReassignmentState { - case object QUEUED extends DirectoryEventRequestState + /** + * The request has not been created. + */ + case object None extends ReassignmentState - case object COMPLETED extends DirectoryEventRequestState + /** + * The request has been queued, it may or may not yet have been sent to the Controller. + */ + case object Queued extends ReassignmentState { + override def maybeInconsistentMetadata: Boolean = true + } + + /** + * The controller has acknowledged the new directory assignment and persisted the change in metadata. + */ + case object Accepted extends ReassignmentState { + override def maybeInconsistentMetadata: Boolean = true + } + + /** + * The future replica has been promoted and replaced the current replica. + */ + case object Effective extends ReassignmentState + } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala index 2118e9fffb9..fe6161ece1f 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala @@ -20,6 +20,7 @@ import kafka.cluster.{BrokerEndPoint, Partition} import kafka.log.{LogManager, UnifiedLog} import kafka.server.AbstractFetcherThread.ResultWithPartitions import kafka.server.QuotaFactory.UnboundedQuota +import kafka.server.ReplicaAlterLogDirsThread.ReassignmentState import kafka.server.metadata.ZkMetadataCache import kafka.utils.{DelayedItem, TestUtils} import org.apache.kafka.common.errors.KafkaStorageException @@ -30,12 +31,13 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests.{FetchRequest, UpdateMetadataRequest} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} +import org.apache.kafka.server.common import org.apache.kafka.server.common.{DirectoryEventHandler, MetadataVersion, OffsetAndEpoch} import org.apache.kafka.storage.internals.log.{FetchIsolation, FetchParams, FetchPartitionData} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.mockito.ArgumentMatchers.{any, anyBoolean} -import org.mockito.Mockito.{doNothing, mock, never, times, verify, verifyNoInteractions, when} +import org.mockito.Mockito.{doNothing, mock, never, times, verify, verifyNoInteractions, verifyNoMoreInteractions, when} import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito} import java.util.{Collections, Optional, OptionalInt, OptionalLong} @@ -129,6 +131,7 @@ class ReplicaAlterLogDirsThreadTest { when(partition.futureLocalLogOrException).thenReturn(futureLog) doNothing().when(partition).truncateTo(offset = 0, isFuture = true) when(partition.maybeReplaceCurrentWithFutureReplica()).thenReturn(true) + when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("gOZOXHnkR9eiA1W9ZuLk8A"))) when(futureLog.logStartOffset).thenReturn(0L) when(futureLog.logEndOffset).thenReturn(0L) @@ -228,6 +231,7 @@ class ReplicaAlterLogDirsThreadTest { when(partition.futureLocalLogOrException).thenReturn(futureLog) doNothing().when(partition).truncateTo(offset = 0, isFuture = true) when(partition.maybeReplaceCurrentWithFutureReplica()).thenReturn(true) + when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("PGLOjDjKQaCOXFOtxymIig"))) when(futureLog.logStartOffset).thenReturn(0L) when(futureLog.logEndOffset).thenReturn(0L) @@ -268,9 +272,9 @@ class ReplicaAlterLogDirsThreadTest { assertEquals(0, thread.partitionCount) } - def updateAssignmentRequestState(thread: ReplicaAlterLogDirsThread, partitionId:Int, newState: ReplicaAlterLogDirsThread.DirectoryEventRequestState) = { + private def updateReassignmentState(thread: ReplicaAlterLogDirsThread, partitionId:Int, newState: ReassignmentState) = { topicNames.get(topicId).map(topicName => { - thread.updatedAssignmentRequestState(new TopicPartition(topicName, partitionId))(newState) + thread.updateReassignmentState(new TopicPartition(topicName, partitionId), newState) }) } @@ -290,6 +294,7 @@ class ReplicaAlterLogDirsThreadTest { val leaderEpoch = 5 val logEndOffset = 0 + val currentDirectoryId = Uuid.fromString("EzI9SqkFQKW1iFc1ZwP9SQ") when(partition.partitionId).thenReturn(partitionId) when(partition.topicId).thenReturn(Some(topicId)) @@ -312,6 +317,7 @@ class ReplicaAlterLogDirsThreadTest { doNothing().when(partition).truncateTo(offset = 0, isFuture = true) when(partition.maybeReplaceCurrentWithFutureReplica()).thenReturn(true) when(partition.runCallbackIfFutureReplicaCaughtUp(any())).thenReturn(true) + when(partition.logDirectoryId()).thenReturn(Some(currentDirectoryId)) when(futureLog.logStartOffset).thenReturn(0L) when(futureLog.logEndOffset).thenReturn(0L) @@ -353,13 +359,13 @@ class ReplicaAlterLogDirsThreadTest { assertTrue(thread.fetchState(t1p0).isDefined) assertEquals(1, thread.partitionCount) - updateAssignmentRequestState(thread, partitionId, ReplicaAlterLogDirsThread.QUEUED) + updateReassignmentState(thread, partitionId, ReassignmentState.Queued) // Don't promote future replica if assignment request is queued but not completed thread.doWork() assertTrue(thread.fetchState(t1p0).isDefined) assertEquals(1, thread.partitionCount) - updateAssignmentRequestState(thread, partitionId, ReplicaAlterLogDirsThread.COMPLETED) + updateReassignmentState(thread, partitionId, ReassignmentState.Accepted) // Promote future replica if assignment request is completed thread.doWork() @@ -448,7 +454,7 @@ class ReplicaAlterLogDirsThreadTest { assertTrue(thread.fetchState(t1p0).isDefined) assertEquals(1, thread.partitionCount) - updateAssignmentRequestState(thread, partitionId, ReplicaAlterLogDirsThread.QUEUED) + updateReassignmentState(thread, partitionId, ReassignmentState.Queued) // revert assignment and delete request state if assignment is cancelled thread.removePartitions(Set(t1p0)) @@ -464,6 +470,40 @@ class ReplicaAlterLogDirsThreadTest { assertEquals(partition.logDirectoryId().get, logIdCaptureT1p0.getValue) } + @Test + def shouldRevertReassignmentsForIncompleteFutureReplicaPromotions(): Unit = { + val replicaManager = Mockito.mock(classOf[ReplicaManager]) + val directoryEventHandler = mock(classOf[DirectoryEventHandler]) + val quotaManager = Mockito.mock(classOf[ReplicationQuotaManager]) + val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) + val endPoint = new BrokerEndPoint(0, "localhost", 1000) + val leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, quotaManager) + val thread = new ReplicaAlterLogDirsThread( + "alter-logs-dirs-thread", + leader, + failedPartitions, + replicaManager, + quotaManager, + Mockito.mock(classOf[BrokerTopicStats]), + 0, + directoryEventHandler) + + val tp = Seq.range(0, 4).map(new TopicPartition("t", _)) + val tips = Seq.range(0, 4).map(new common.TopicIdPartition(topicId, _)) + val dirIds = Seq.range(0, 4).map(i => Uuid.fromString(s"TESTBROKER0000DIR${i}AAAA")) + tp.foreach(tp => thread.promotionStates.put(tp, ReplicaAlterLogDirsThread.PromotionState(ReassignmentState.None, Some(topicId), Some(dirIds(tp.partition()))))) + thread.updateReassignmentState(tp(0), ReassignmentState.None) + thread.updateReassignmentState(tp(1), ReassignmentState.Queued) + thread.updateReassignmentState(tp(2), ReassignmentState.Accepted) + thread.updateReassignmentState(tp(3), ReassignmentState.Effective) + + thread.removePartitions(tp.toSet) + + verify(directoryEventHandler).handleAssignment(ArgumentMatchers.eq(tips(1)), ArgumentMatchers.eq(dirIds(1)), any()) + verify(directoryEventHandler).handleAssignment(ArgumentMatchers.eq(tips(2)), ArgumentMatchers.eq(dirIds(2)), any()) + verifyNoMoreInteractions(directoryEventHandler) + } + private def mockFetchFromCurrentLog(topicIdPartition: TopicIdPartition, requestData: FetchRequest.PartitionData, config: KafkaConfig, @@ -691,6 +731,8 @@ class ReplicaAlterLogDirsThreadTest { .setErrorCode(Errors.NONE.code) .setLeaderEpoch(leaderEpoch) .setEndOffset(replicaT1p1LEO)) + when(partitionT1p0.logDirectoryId()).thenReturn(Some(Uuid.fromString("Jsg8ufNCQYONNquPt7VYpA"))) + when(partitionT1p1.logDirectoryId()).thenReturn(Some(Uuid.fromString("D2Yf6FtNROGVKoIZadSFIg"))) when(replicaManager.logManager).thenReturn(logManager) stubWithFetchMessages(logT1p0, logT1p1, futureLogT1p0, partitionT1p0, replicaManager, responseCallback) @@ -775,6 +817,7 @@ class ReplicaAlterLogDirsThreadTest { .setEndOffset(replicaEpochEndOffset)) when(futureLog.endOffsetForEpoch(leaderEpoch - 2)).thenReturn( Some(new OffsetAndEpoch(futureReplicaEpochEndOffset, leaderEpoch - 2))) + when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("n6WOe2zPScqZLIreCWN6Ug"))) when(replicaManager.logManager).thenReturn(logManager) stubWithFetchMessages(log, null, futureLog, partition, replicaManager, responseCallback) @@ -829,6 +872,7 @@ class ReplicaAlterLogDirsThreadTest { when(replicaManager.futureLogExists(t1p0)).thenReturn(true) when(replicaManager.logManager).thenReturn(logManager) + when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("b2e1ihvGQiu6A504oKoddQ"))) // pretend this is a completely new future replica, with no leader epochs recorded when(futureLog.latestEpoch).thenReturn(None) @@ -880,6 +924,7 @@ class ReplicaAlterLogDirsThreadTest { //Stubs when(partition.partitionId).thenReturn(partitionId) + when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("wO7bUpvcSZC0QKEK6P6AiA"))) when(replicaManager.metadataCache).thenReturn(metadataCache) when(replicaManager.getPartitionOrException(t1p0)) @@ -967,6 +1012,7 @@ class ReplicaAlterLogDirsThreadTest { val replicaLEO = 213 when(partition.partitionId).thenReturn(partitionId) + when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("dybMM9CpRP2s6HSslW4NHg"))) when(replicaManager.metadataCache).thenReturn(metadataCache) when(replicaManager.getPartitionOrException(t1p0)) @@ -1025,6 +1071,9 @@ class ReplicaAlterLogDirsThreadTest { //Stubs when(replicaManager.logManager).thenReturn(logManager) when(replicaManager.metadataCache).thenReturn(metadataCache) + when(replicaManager.getPartitionOrException(t1p0)).thenReturn(partition) + when(replicaManager.getPartitionOrException(t1p1)).thenReturn(partition) + when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("Y0qUL19gSmKAXmohmrUM4g"))) stub(log, null, futureLog, partition, replicaManager) //Create the fetcher thread @@ -1076,6 +1125,9 @@ class ReplicaAlterLogDirsThreadTest { when(futureLog.logStartOffset).thenReturn(startOffset) when(replicaManager.logManager).thenReturn(logManager) when(replicaManager.metadataCache).thenReturn(metadataCache) + when(replicaManager.getPartitionOrException(t1p0)).thenReturn(partition) + when(replicaManager.getPartitionOrException(t1p1)).thenReturn(partition) + when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("rtrdy3nsQwO1OQUEUYGxRQ"))) stub(log, null, futureLog, partition, replicaManager) //Create the fetcher thread diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index bf7f6c82e05..4c397657417 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -2867,15 +2867,17 @@ public class ReplicationControlManagerTest { Uuid dir1b1 = Uuid.fromString("hO2YI5bgRUmByNPHiHxjNQ"); Uuid dir2b1 = Uuid.fromString("R3Gb1HLoTzuKMgAkH5Vtpw"); Uuid dir1b2 = Uuid.fromString("TBGa8UayQi6KguqF5nC0sw"); + Uuid offlineDir = Uuid.fromString("zvAf9BKZRyyrEWz4FX2nLA"); ctx.registerBrokersWithDirs(1, asList(dir1b1, dir2b1), 2, singletonList(dir1b2)); ctx.unfenceBrokers(1, 2); - Uuid topicA = ctx.createTestTopic("a", new int[][]{new int[]{1, 2}, new int[]{1, 2}}).topicId(); + Uuid topicA = ctx.createTestTopic("a", new int[][]{new int[]{1, 2}, new int[]{1, 2}, new int[]{1, 2}}).topicId(); Uuid topicB = ctx.createTestTopic("b", new int[][]{new int[]{1, 2}, new int[]{1, 2}}).topicId(); Uuid topicC = ctx.createTestTopic("c", new int[][]{new int[]{2}}).topicId(); ControllerResult<AssignReplicasToDirsResponseData> controllerResult = ctx.assignReplicasToDirs(1, new HashMap<TopicIdPartition, Uuid>() {{ put(new TopicIdPartition(topicA, 0), dir1b1); put(new TopicIdPartition(topicA, 1), dir2b1); + put(new TopicIdPartition(topicA, 2), offlineDir); // unknown/offline dir put(new TopicIdPartition(topicB, 0), dir1b1); put(new TopicIdPartition(topicB, 1), DirectoryId.LOST); put(new TopicIdPartition(Uuid.fromString("nLU9hKNXSZuMe5PO2A4dVQ"), 1), dir2b1); // expect UNKNOWN_TOPIC_ID @@ -2894,6 +2896,9 @@ public class ReplicationControlManagerTest { put(new TopicIdPartition(topicA, 1), NONE); put(new TopicIdPartition(Uuid.fromString("nLU9hKNXSZuMe5PO2A4dVQ"), 1), UNKNOWN_TOPIC_ID); }}); + put(offlineDir, new HashMap<TopicIdPartition, Errors>() {{ + put(new TopicIdPartition(topicA, 2), NONE); + }}); put(DirectoryId.LOST, new HashMap<TopicIdPartition, Errors>() {{ put(new TopicIdPartition(topicB, 1), NONE); }}); @@ -2906,6 +2911,9 @@ public class ReplicationControlManagerTest { new ApiMessageAndVersion( new PartitionChangeRecord().setTopicId(topicA).setPartitionId(1). setDirectories(asList(dir2b1, dir1b2)), recordVersion), + new ApiMessageAndVersion( + new PartitionChangeRecord().setTopicId(topicA).setPartitionId(2). + setDirectories(asList(offlineDir, dir1b2)), recordVersion), new ApiMessageAndVersion( new PartitionChangeRecord().setTopicId(topicB).setPartitionId(0). setDirectories(asList(dir1b1, dir1b2)), recordVersion), @@ -2913,8 +2921,13 @@ public class ReplicationControlManagerTest { new PartitionChangeRecord().setTopicId(topicB).setPartitionId(1). setDirectories(asList(DirectoryId.LOST, dir1b2)), recordVersion), - // In addition to the directory assignment changes we expect an additional record, - // which elects a new leader for bar-1 which has been assigned to an offline directory. + // In addition to the directory assignment changes we expect two additional records, + // which elect new leaders for: + // - a-2 which has been assigned to a directory which is not an online directory (unknown/offline) + // - b-1 which has been assigned to an offline directory. + new ApiMessageAndVersion( + new PartitionChangeRecord().setTopicId(topicA).setPartitionId(2). + setIsr(singletonList(2)).setLeader(2), recordVersion), new ApiMessageAndVersion( new PartitionChangeRecord().setTopicId(topicB).setPartitionId(1). setIsr(singletonList(2)).setLeader(2), recordVersion) @@ -2927,6 +2940,7 @@ public class ReplicationControlManagerTest { add(new TopicIdPartition(topicB, 0)); }}, RecordTestUtils.iteratorToSet(ctx.replicationControl.brokersToIsrs().iterator(1, true))); assertEquals(new HashSet<TopicIdPartition>() {{ + add(new TopicIdPartition(topicA, 2)); add(new TopicIdPartition(topicB, 1)); add(new TopicIdPartition(topicC, 0)); }},