This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.7 by this push:
     new a4f1aa2e630 KAFKA-16297: Race condition while promoting future replica 
(#15557)
a4f1aa2e630 is described below

commit a4f1aa2e6306951f84f03819bbd8c135f480248c
Author: Igor Soarez <soa...@apple.com>
AuthorDate: Wed Apr 10 10:57:05 2024 +0100

    KAFKA-16297: Race condition while promoting future replica (#15557)
    
    If a future replica doesn't get promoted, any directory reassignment sent 
to the controller should be reversed.
    
    The current logic is already addressing the case when a replica hasn't yet 
been promoted and the controller hasn't yet acknowledged the directory 
reassignment. However, it doesn't cover the case where the replica does not get 
promoted due to a directory failure after the controller has acknowledged the 
reassignment but before the future replica catches up again and is promoted to 
main replica.
    
    Reviewers: Luke Chen <show...@gmail.com>
---
 .../kafka/server/ReplicaAlterLogDirsThread.scala   | 119 +++++++++++++++------
 .../server/ReplicaAlterLogDirsThreadTest.scala     |  64 +++++++++--
 .../controller/ReplicationControlManagerTest.java  |  20 +++-
 3 files changed, 159 insertions(+), 44 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala 
b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index b49144f840b..33e6afe5cc1 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -18,8 +18,8 @@
 package kafka.server
 
 import kafka.cluster.Partition
-import kafka.server.ReplicaAlterLogDirsThread.{DirectoryEventRequestState, 
QUEUED}
-import org.apache.kafka.common.TopicPartition
+import kafka.server.ReplicaAlterLogDirsThread.{PromotionState, 
ReassignmentState}
+import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.common.requests.FetchResponse
 import org.apache.kafka.server.common.{DirectoryEventHandler, OffsetAndEpoch, 
TopicIdPartition}
 import org.apache.kafka.storage.internals.log.{LogAppendInfo, 
LogStartOffsetIncrementReason}
@@ -45,7 +45,8 @@ class ReplicaAlterLogDirsThread(name: String,
                                 isInterruptible = false,
                                 brokerTopicStats) {
 
-  private val assignmentRequestStates: ConcurrentHashMap[TopicPartition, 
DirectoryEventRequestState] = new ConcurrentHashMap()
+  // Visible for testing
+  private[server] val promotionStates: ConcurrentHashMap[TopicPartition, 
PromotionState] = new ConcurrentHashMap()
 
   override protected def latestEpoch(topicPartition: TopicPartition): 
Option[Int] = {
     replicaMgr.futureLocalLogOrException(topicPartition).latestEpoch
@@ -96,23 +97,26 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 
   override def removePartitions(topicPartitions: Set[TopicPartition]): 
Map[TopicPartition, PartitionFetchState] = {
-    // Schedule assignment request to revert any queued request before 
cancelling
-    for {
-      topicPartition <- topicPartitions
-      partitionState <- partitionAssignmentRequestState(topicPartition)
-      if partitionState == QUEUED
-      partition = replicaMgr.getPartitionOrException(topicPartition)
-      topicId <- partition.topicId
-      directoryId <- partition.logDirectoryId()
-      topicIdPartition = new TopicIdPartition(topicId, 
topicPartition.partition())
-    } directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () 
=> ())
+    for (topicPartition <- topicPartitions) {
+      if (this.promotionStates.containsKey(topicPartition)) {
+        val PromotionState(reassignmentState, topicId, originalDir) = 
this.promotionStates.get(topicPartition)
+        // Revert any reassignments for partitions that did not complete the 
future replica promotion
+        if (originalDir.isDefined && topicId.isDefined && 
reassignmentState.maybeInconsistentMetadata) {
+          directoryEventHandler.handleAssignment(new 
TopicIdPartition(topicId.get, topicPartition.partition()), originalDir.get, () 
=> ())
+        }
+        this.promotionStates.remove(topicPartition)
+      }
+    }
 
     super.removePartitions(topicPartitions)
   }
 
+  private def reassignmentState(topicPartition: TopicPartition): 
ReassignmentState = promotionStates.get(topicPartition).reassignmentState
+
   // Visible for testing
-  private[server] def updatedAssignmentRequestState(topicPartition: 
TopicPartition)(state: ReplicaAlterLogDirsThread.DirectoryEventRequestState): 
Unit = {
-    assignmentRequestStates.put(topicPartition, state)
+  private[server] def updateReassignmentState(topicPartition: TopicPartition, 
state: ReassignmentState): Unit = {
+    log.debug(s"Updating future replica ${topicPartition} reassignment state 
to ${state}")
+    promotionStates.put(topicPartition, 
promotionStates.get(topicPartition).withAssignment(state))
   }
 
   private def maybePromoteFutureReplica(topicPartition: TopicPartition, 
partition: Partition) = {
@@ -120,33 +124,28 @@ class ReplicaAlterLogDirsThread(name: String,
     if (topicId.isEmpty)
       throw new IllegalStateException(s"Topic ${topicPartition.topic()} does 
not have an ID.")
 
-    partitionAssignmentRequestState(topicPartition) match {
-      case None =>
+    reassignmentState(topicPartition) match {
+      case ReassignmentState.None =>
         // Schedule assignment request and don't promote the future replica 
yet until the controller has accepted the request.
         partition.runCallbackIfFutureReplicaCaughtUp(_ => {
-          partition.futureReplicaDirectoryId()
-            .map(id => {
-              directoryEventHandler.handleAssignment(new 
TopicIdPartition(topicId.get, topicPartition.partition()), id,
-                () => 
updatedAssignmentRequestState(topicPartition)(ReplicaAlterLogDirsThread.COMPLETED))
-              // mark the assignment request state as queued.
-              
updatedAssignmentRequestState(topicPartition)(ReplicaAlterLogDirsThread.QUEUED)
-            })
+          val targetDir = partition.futureReplicaDirectoryId().get
+          val topicIdPartition = new TopicIdPartition(topicId.get, 
topicPartition.partition())
+          directoryEventHandler.handleAssignment(topicIdPartition, targetDir, 
() => updateReassignmentState(topicPartition, ReassignmentState.Accepted))
+          updateReassignmentState(topicPartition, ReassignmentState.Queued)
         })
-      case Some(ReplicaAlterLogDirsThread.COMPLETED) =>
+      case ReassignmentState.Accepted =>
         // Promote future replica if controller accepted the request and the 
replica caught-up with the original log.
         if (partition.maybeReplaceCurrentWithFutureReplica()) {
+          updateReassignmentState(topicPartition, ReassignmentState.Effective)
           removePartitions(Set(topicPartition))
-          assignmentRequestStates.remove(topicPartition)
         }
-      case _ =>
-        log.trace("Waiting for AssignmentRequest to succeed before promoting 
the future replica.")
+      case ReassignmentState.Queued =>
+        log.trace("Waiting for AssignReplicasToDirsRequest to succeed before 
promoting the future replica.")
+      case ReassignmentState.Effective =>
+        throw new IllegalStateException("BUG: trying to promote a future 
replica twice")
     }
   }
 
-  private def partitionAssignmentRequestState(topicPartition: TopicPartition): 
Option[DirectoryEventRequestState] = {
-    Option(assignmentRequestStates.get(topicPartition))
-  }
-
   override def addPartitions(initialFetchStates: Map[TopicPartition, 
InitialFetchState]): Set[TopicPartition] = {
     partitionMapLock.lockInterruptibly()
     try {
@@ -155,6 +154,13 @@ class ReplicaAlterLogDirsThread(name: String,
       val filteredFetchStates = initialFetchStates.filter { case (tp, _) =>
         replicaMgr.futureLogExists(tp)
       }
+      filteredFetchStates.foreach {
+        case (topicPartition, state) =>
+          val topicId = state.topicId
+          val currentDirectoryId = 
replicaMgr.getPartitionOrException(topicPartition).logDirectoryId()
+          val promotionState = PromotionState(ReassignmentState.None, topicId, 
currentDirectoryId)
+          promotionStates.put(topicPartition, promotionState)
+      }
       super.addPartitions(filteredFetchStates)
     } finally {
       partitionMapLock.unlock()
@@ -188,9 +194,52 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 }
 object ReplicaAlterLogDirsThread {
-  sealed trait DirectoryEventRequestState
+  /**
+   * @param reassignmentState Tracks the state of the replica-to-directory 
assignment update in the metadata
+   * @param topicId           The ID of the topic, which is useful if a 
reverting the assignment is required
+   * @param currentDir        The original directory ID from which the future 
replica fetches from
+   */
+  case class PromotionState(reassignmentState: ReassignmentState, topicId: 
Option[Uuid], currentDir: Option[Uuid]) {
+    def withAssignment(newDirReassignmentState: ReassignmentState): 
PromotionState =
+      PromotionState(newDirReassignmentState, topicId, currentDir)
+  }
+
+  /**
+   * Represents the state of the request to update the directory assignment 
from the current replica directory
+   * to the future replica directory.
+   */
+  sealed trait ReassignmentState {
+    /**
+     * @return true if the directory assignment in the cluster metadata may be 
inconsistent with the actual
+     *         directory where the main replica is hosted.
+     */
+    def maybeInconsistentMetadata: Boolean = false
+  }
+
+  object ReassignmentState {
 
-  case object QUEUED extends DirectoryEventRequestState
+    /**
+     * The request has not been created.
+     */
+    case object None extends ReassignmentState
 
-  case object COMPLETED extends DirectoryEventRequestState
+    /**
+     * The request has been queued, it may or may not yet have been sent to 
the Controller.
+     */
+    case object Queued extends ReassignmentState {
+      override def maybeInconsistentMetadata: Boolean = true
+    }
+
+    /**
+     * The controller has acknowledged the new directory assignment and 
persisted the change in metadata.
+     */
+    case object Accepted extends ReassignmentState {
+      override def maybeInconsistentMetadata: Boolean = true
+    }
+
+    /**
+     * The future replica has been promoted and replaced the current replica.
+     */
+    case object Effective extends ReassignmentState
+  }
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 2118e9fffb9..fe6161ece1f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -20,6 +20,7 @@ import kafka.cluster.{BrokerEndPoint, Partition}
 import kafka.log.{LogManager, UnifiedLog}
 import kafka.server.AbstractFetcherThread.ResultWithPartitions
 import kafka.server.QuotaFactory.UnboundedQuota
+import kafka.server.ReplicaAlterLogDirsThread.ReassignmentState
 import kafka.server.metadata.ZkMetadataCache
 import kafka.utils.{DelayedItem, TestUtils}
 import org.apache.kafka.common.errors.KafkaStorageException
@@ -30,12 +31,13 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.{FetchRequest, UpdateMetadataRequest}
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
+import org.apache.kafka.server.common
 import org.apache.kafka.server.common.{DirectoryEventHandler, MetadataVersion, 
OffsetAndEpoch}
 import org.apache.kafka.storage.internals.log.{FetchIsolation, FetchParams, 
FetchPartitionData}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 import org.mockito.ArgumentMatchers.{any, anyBoolean}
-import org.mockito.Mockito.{doNothing, mock, never, times, verify, 
verifyNoInteractions, when}
+import org.mockito.Mockito.{doNothing, mock, never, times, verify, 
verifyNoInteractions, verifyNoMoreInteractions, when}
 import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
 
 import java.util.{Collections, Optional, OptionalInt, OptionalLong}
@@ -129,6 +131,7 @@ class ReplicaAlterLogDirsThreadTest {
     when(partition.futureLocalLogOrException).thenReturn(futureLog)
     doNothing().when(partition).truncateTo(offset = 0, isFuture = true)
     when(partition.maybeReplaceCurrentWithFutureReplica()).thenReturn(true)
+    
when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("gOZOXHnkR9eiA1W9ZuLk8A")))
 
     when(futureLog.logStartOffset).thenReturn(0L)
     when(futureLog.logEndOffset).thenReturn(0L)
@@ -228,6 +231,7 @@ class ReplicaAlterLogDirsThreadTest {
     when(partition.futureLocalLogOrException).thenReturn(futureLog)
     doNothing().when(partition).truncateTo(offset = 0, isFuture = true)
     when(partition.maybeReplaceCurrentWithFutureReplica()).thenReturn(true)
+    
when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("PGLOjDjKQaCOXFOtxymIig")))
 
     when(futureLog.logStartOffset).thenReturn(0L)
     when(futureLog.logEndOffset).thenReturn(0L)
@@ -268,9 +272,9 @@ class ReplicaAlterLogDirsThreadTest {
     assertEquals(0, thread.partitionCount)
   }
 
-  def updateAssignmentRequestState(thread: ReplicaAlterLogDirsThread, 
partitionId:Int, newState: 
ReplicaAlterLogDirsThread.DirectoryEventRequestState) = {
+  private def updateReassignmentState(thread: ReplicaAlterLogDirsThread, 
partitionId:Int, newState: ReassignmentState) = {
     topicNames.get(topicId).map(topicName => {
-      thread.updatedAssignmentRequestState(new TopicPartition(topicName, 
partitionId))(newState)
+      thread.updateReassignmentState(new TopicPartition(topicName, 
partitionId), newState)
     })
   }
 
@@ -290,6 +294,7 @@ class ReplicaAlterLogDirsThreadTest {
 
     val leaderEpoch = 5
     val logEndOffset = 0
+    val currentDirectoryId = Uuid.fromString("EzI9SqkFQKW1iFc1ZwP9SQ")
 
     when(partition.partitionId).thenReturn(partitionId)
     when(partition.topicId).thenReturn(Some(topicId))
@@ -312,6 +317,7 @@ class ReplicaAlterLogDirsThreadTest {
     doNothing().when(partition).truncateTo(offset = 0, isFuture = true)
     when(partition.maybeReplaceCurrentWithFutureReplica()).thenReturn(true)
     when(partition.runCallbackIfFutureReplicaCaughtUp(any())).thenReturn(true)
+    when(partition.logDirectoryId()).thenReturn(Some(currentDirectoryId))
 
     when(futureLog.logStartOffset).thenReturn(0L)
     when(futureLog.logEndOffset).thenReturn(0L)
@@ -353,13 +359,13 @@ class ReplicaAlterLogDirsThreadTest {
     assertTrue(thread.fetchState(t1p0).isDefined)
     assertEquals(1, thread.partitionCount)
 
-    updateAssignmentRequestState(thread, partitionId, 
ReplicaAlterLogDirsThread.QUEUED)
+    updateReassignmentState(thread, partitionId, ReassignmentState.Queued)
 
     // Don't promote future replica if assignment request is queued but not 
completed
     thread.doWork()
     assertTrue(thread.fetchState(t1p0).isDefined)
     assertEquals(1, thread.partitionCount)
-    updateAssignmentRequestState(thread, partitionId, 
ReplicaAlterLogDirsThread.COMPLETED)
+    updateReassignmentState(thread, partitionId, ReassignmentState.Accepted)
 
     // Promote future replica if assignment request is completed
     thread.doWork()
@@ -448,7 +454,7 @@ class ReplicaAlterLogDirsThreadTest {
     assertTrue(thread.fetchState(t1p0).isDefined)
     assertEquals(1, thread.partitionCount)
 
-    updateAssignmentRequestState(thread, partitionId, 
ReplicaAlterLogDirsThread.QUEUED)
+    updateReassignmentState(thread, partitionId, ReassignmentState.Queued)
 
     // revert assignment and delete request state if assignment is cancelled
     thread.removePartitions(Set(t1p0))
@@ -464,6 +470,40 @@ class ReplicaAlterLogDirsThreadTest {
     assertEquals(partition.logDirectoryId().get, logIdCaptureT1p0.getValue)
   }
 
+  @Test
+  def shouldRevertReassignmentsForIncompleteFutureReplicaPromotions(): Unit = {
+    val replicaManager = Mockito.mock(classOf[ReplicaManager])
+    val directoryEventHandler = mock(classOf[DirectoryEventHandler])
+    val quotaManager = Mockito.mock(classOf[ReplicationQuotaManager])
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, 
quotaManager)
+    val thread = new ReplicaAlterLogDirsThread(
+      "alter-logs-dirs-thread",
+      leader,
+      failedPartitions,
+      replicaManager,
+      quotaManager,
+      Mockito.mock(classOf[BrokerTopicStats]),
+      0,
+      directoryEventHandler)
+
+    val tp = Seq.range(0, 4).map(new TopicPartition("t", _))
+    val tips = Seq.range(0, 4).map(new common.TopicIdPartition(topicId, _))
+    val dirIds = Seq.range(0, 4).map(i => 
Uuid.fromString(s"TESTBROKER0000DIR${i}AAAA"))
+    tp.foreach(tp => thread.promotionStates.put(tp, 
ReplicaAlterLogDirsThread.PromotionState(ReassignmentState.None, Some(topicId), 
Some(dirIds(tp.partition())))))
+    thread.updateReassignmentState(tp(0), ReassignmentState.None)
+    thread.updateReassignmentState(tp(1), ReassignmentState.Queued)
+    thread.updateReassignmentState(tp(2), ReassignmentState.Accepted)
+    thread.updateReassignmentState(tp(3), ReassignmentState.Effective)
+
+    thread.removePartitions(tp.toSet)
+
+    
verify(directoryEventHandler).handleAssignment(ArgumentMatchers.eq(tips(1)), 
ArgumentMatchers.eq(dirIds(1)), any())
+    
verify(directoryEventHandler).handleAssignment(ArgumentMatchers.eq(tips(2)), 
ArgumentMatchers.eq(dirIds(2)), any())
+    verifyNoMoreInteractions(directoryEventHandler)
+  }
+
   private def mockFetchFromCurrentLog(topicIdPartition: TopicIdPartition,
                                       requestData: FetchRequest.PartitionData,
                                       config: KafkaConfig,
@@ -691,6 +731,8 @@ class ReplicaAlterLogDirsThreadTest {
         .setErrorCode(Errors.NONE.code)
         .setLeaderEpoch(leaderEpoch)
         .setEndOffset(replicaT1p1LEO))
+    
when(partitionT1p0.logDirectoryId()).thenReturn(Some(Uuid.fromString("Jsg8ufNCQYONNquPt7VYpA")))
+    
when(partitionT1p1.logDirectoryId()).thenReturn(Some(Uuid.fromString("D2Yf6FtNROGVKoIZadSFIg")))
 
     when(replicaManager.logManager).thenReturn(logManager)
     stubWithFetchMessages(logT1p0, logT1p1, futureLogT1p0, partitionT1p0, 
replicaManager, responseCallback)
@@ -775,6 +817,7 @@ class ReplicaAlterLogDirsThreadTest {
         .setEndOffset(replicaEpochEndOffset))
     when(futureLog.endOffsetForEpoch(leaderEpoch - 2)).thenReturn(
       Some(new OffsetAndEpoch(futureReplicaEpochEndOffset, leaderEpoch - 2)))
+    
when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("n6WOe2zPScqZLIreCWN6Ug")))
 
     when(replicaManager.logManager).thenReturn(logManager)
     stubWithFetchMessages(log, null, futureLog, partition, replicaManager, 
responseCallback)
@@ -829,6 +872,7 @@ class ReplicaAlterLogDirsThreadTest {
     when(replicaManager.futureLogExists(t1p0)).thenReturn(true)
 
     when(replicaManager.logManager).thenReturn(logManager)
+    
when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("b2e1ihvGQiu6A504oKoddQ")))
 
     // pretend this is a completely new future replica, with no leader epochs 
recorded
     when(futureLog.latestEpoch).thenReturn(None)
@@ -880,6 +924,7 @@ class ReplicaAlterLogDirsThreadTest {
 
     //Stubs
     when(partition.partitionId).thenReturn(partitionId)
+    
when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("wO7bUpvcSZC0QKEK6P6AiA")))
 
     when(replicaManager.metadataCache).thenReturn(metadataCache)
     when(replicaManager.getPartitionOrException(t1p0))
@@ -967,6 +1012,7 @@ class ReplicaAlterLogDirsThreadTest {
     val replicaLEO = 213
 
     when(partition.partitionId).thenReturn(partitionId)
+    
when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("dybMM9CpRP2s6HSslW4NHg")))
 
     when(replicaManager.metadataCache).thenReturn(metadataCache)
     when(replicaManager.getPartitionOrException(t1p0))
@@ -1025,6 +1071,9 @@ class ReplicaAlterLogDirsThreadTest {
     //Stubs
     when(replicaManager.logManager).thenReturn(logManager)
     when(replicaManager.metadataCache).thenReturn(metadataCache)
+    when(replicaManager.getPartitionOrException(t1p0)).thenReturn(partition)
+    when(replicaManager.getPartitionOrException(t1p1)).thenReturn(partition)
+    
when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("Y0qUL19gSmKAXmohmrUM4g")))
     stub(log, null, futureLog, partition, replicaManager)
 
     //Create the fetcher thread
@@ -1076,6 +1125,9 @@ class ReplicaAlterLogDirsThreadTest {
     when(futureLog.logStartOffset).thenReturn(startOffset)
     when(replicaManager.logManager).thenReturn(logManager)
     when(replicaManager.metadataCache).thenReturn(metadataCache)
+    when(replicaManager.getPartitionOrException(t1p0)).thenReturn(partition)
+    when(replicaManager.getPartitionOrException(t1p1)).thenReturn(partition)
+    
when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("rtrdy3nsQwO1OQUEUYGxRQ")))
     stub(log, null, futureLog, partition, replicaManager)
 
     //Create the fetcher thread
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index bf7f6c82e05..4c397657417 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -2867,15 +2867,17 @@ public class ReplicationControlManagerTest {
         Uuid dir1b1 = Uuid.fromString("hO2YI5bgRUmByNPHiHxjNQ");
         Uuid dir2b1 = Uuid.fromString("R3Gb1HLoTzuKMgAkH5Vtpw");
         Uuid dir1b2 = Uuid.fromString("TBGa8UayQi6KguqF5nC0sw");
+        Uuid offlineDir = Uuid.fromString("zvAf9BKZRyyrEWz4FX2nLA");
         ctx.registerBrokersWithDirs(1, asList(dir1b1, dir2b1), 2, 
singletonList(dir1b2));
         ctx.unfenceBrokers(1, 2);
-        Uuid topicA = ctx.createTestTopic("a", new int[][]{new int[]{1, 2}, 
new int[]{1, 2}}).topicId();
+        Uuid topicA = ctx.createTestTopic("a", new int[][]{new int[]{1, 2}, 
new int[]{1, 2}, new int[]{1, 2}}).topicId();
         Uuid topicB = ctx.createTestTopic("b", new int[][]{new int[]{1, 2}, 
new int[]{1, 2}}).topicId();
         Uuid topicC = ctx.createTestTopic("c", new int[][]{new 
int[]{2}}).topicId();
 
         ControllerResult<AssignReplicasToDirsResponseData> controllerResult = 
ctx.assignReplicasToDirs(1, new HashMap<TopicIdPartition, Uuid>() {{
                 put(new TopicIdPartition(topicA, 0), dir1b1);
                 put(new TopicIdPartition(topicA, 1), dir2b1);
+                put(new TopicIdPartition(topicA, 2), offlineDir); // 
unknown/offline dir
                 put(new TopicIdPartition(topicB, 0), dir1b1);
                 put(new TopicIdPartition(topicB, 1), DirectoryId.LOST);
                 put(new 
TopicIdPartition(Uuid.fromString("nLU9hKNXSZuMe5PO2A4dVQ"), 1), dir2b1); // 
expect UNKNOWN_TOPIC_ID
@@ -2894,6 +2896,9 @@ public class ReplicationControlManagerTest {
                         put(new TopicIdPartition(topicA, 1), NONE);
                         put(new 
TopicIdPartition(Uuid.fromString("nLU9hKNXSZuMe5PO2A4dVQ"), 1), 
UNKNOWN_TOPIC_ID);
                     }});
+                put(offlineDir, new HashMap<TopicIdPartition, Errors>() {{
+                        put(new TopicIdPartition(topicA, 2), NONE);
+                    }});
                 put(DirectoryId.LOST, new HashMap<TopicIdPartition, Errors>() 
{{
                         put(new TopicIdPartition(topicB, 1), NONE);
                     }});
@@ -2906,6 +2911,9 @@ public class ReplicationControlManagerTest {
                 new ApiMessageAndVersion(
                         new 
PartitionChangeRecord().setTopicId(topicA).setPartitionId(1).
                                 setDirectories(asList(dir2b1, dir1b2)), 
recordVersion),
+                new ApiMessageAndVersion(
+                        new 
PartitionChangeRecord().setTopicId(topicA).setPartitionId(2).
+                                setDirectories(asList(offlineDir, dir1b2)), 
recordVersion),
                 new ApiMessageAndVersion(
                         new 
PartitionChangeRecord().setTopicId(topicB).setPartitionId(0).
                                 setDirectories(asList(dir1b1, dir1b2)), 
recordVersion),
@@ -2913,8 +2921,13 @@ public class ReplicationControlManagerTest {
                         new 
PartitionChangeRecord().setTopicId(topicB).setPartitionId(1).
                                 setDirectories(asList(DirectoryId.LOST, 
dir1b2)), recordVersion),
 
-                // In addition to the directory assignment changes we expect 
an additional record,
-                // which elects a new leader for bar-1 which has been assigned 
to an offline directory.
+                // In addition to the directory assignment changes we expect 
two additional records,
+                // which elect new leaders for:
+                //   - a-2 which has been assigned to a directory which is not 
an online directory (unknown/offline)
+                //   - b-1 which has been assigned to an offline directory.
+                new ApiMessageAndVersion(
+                        new 
PartitionChangeRecord().setTopicId(topicA).setPartitionId(2).
+                                setIsr(singletonList(2)).setLeader(2), 
recordVersion),
                 new ApiMessageAndVersion(
                         new 
PartitionChangeRecord().setTopicId(topicB).setPartitionId(1).
                                 setIsr(singletonList(2)).setLeader(2), 
recordVersion)
@@ -2927,6 +2940,7 @@ public class ReplicationControlManagerTest {
                 add(new TopicIdPartition(topicB, 0));
             }}, 
RecordTestUtils.iteratorToSet(ctx.replicationControl.brokersToIsrs().iterator(1,
 true)));
         assertEquals(new HashSet<TopicIdPartition>() {{
+                add(new TopicIdPartition(topicA, 2));
                 add(new TopicIdPartition(topicB, 1));
                 add(new TopicIdPartition(topicC, 0));
             }},

Reply via email to