[ 
https://issues.apache.org/jira/browse/KAFKA-6795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16453110#comment-16453110
 ] 

ASF GitHub Bot commented on KAFKA-6795:
---------------------------------------

lindong28 closed pull request #4918: KAFKA-6795: Added unit tests for 
ReplicaAlterLogDirsThread
URL: https://github.com/apache/kafka/pull/4918
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 8d787c96da6..f919ddf017c 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -434,7 +434,7 @@ case class PartitionFetchState(fetchOffset: Long, delay: 
DelayedItem, truncating
 
   def this(offset: Long, truncatingLog: Boolean) = this(offset, new 
DelayedItem(0), truncatingLog)
 
-  def this(offset: Long, delay: DelayedItem) = this(offset, new 
DelayedItem(0), false)
+  def this(offset: Long, delay: DelayedItem) = this(offset, delay, false)
 
   def this(fetchOffset: Long) = this(fetchOffset, new DelayedItem(0))
 
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala 
b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 48c83d43835..0faf5dc3838 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -58,8 +58,6 @@ class ReplicaAlterLogDirsThread(name: String,
   private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
   private val fetchSize = brokerConfig.replicaFetchMaxBytes
 
-  private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochCache] =  
replicaMgr.getReplica(tp).map(_.epochs.get)
-
   def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] 
= {
     var partitionData: Seq[(TopicPartition, FetchResponse.PartitionData)] = 
null
     val request = fetchRequest.underlying.build()
@@ -141,7 +139,13 @@ class ReplicaAlterLogDirsThread(name: String,
       delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong)
   }
 
+  /**
+   * Builds offset for leader epoch requests for partitions that are in the 
truncating phase based
+   * on latest epochs of the future replicas (the one that is fetching)
+   */
   def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, 
PartitionFetchState)]): ResultWithPartitions[Map[TopicPartition, Int]] = {
+    def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochCache] = 
replicaMgr.getReplica(tp, Request.FutureLocalReplicaId).map(_.epochs.get)
+
     val partitionEpochOpts = allPartitions
       .filter { case (_, state) => state.isTruncatingLog }
       .map { case (tp, _) => tp -> epochCacheOpt(tp) }.toMap
@@ -152,6 +156,11 @@ class ReplicaAlterLogDirsThread(name: String,
     ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet)
   }
 
+  /**
+   * Fetches offset for leader epoch from local replica for each given topic 
partitions
+   * @param partitions map of topic partition -> leader epoch of the future 
replica
+   * @return map of topic partition -> end offset for a requested leader epoch
+   */
   def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): 
Map[TopicPartition, EpochEndOffset] = {
     partitions.map { case (tp, epoch) =>
       try {
@@ -263,4 +272,4 @@ object ReplicaAlterLogDirsThread {
 
     override def toString = underlying.toString
   }
-}
\ No newline at end of file
+}
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
new file mode 100644
index 00000000000..a0f1dae8c76
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -0,0 +1,526 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server
+
+
+import kafka.api.Request
+import kafka.cluster.{BrokerEndPoint, Replica, Partition}
+import kafka.log.LogManager
+import kafka.server.AbstractFetcherThread.ResultWithPartitions
+import kafka.server.FetchPartitionData
+import kafka.server.epoch.LeaderEpochCache
+import org.apache.kafka.common.errors.{ReplicaNotAvailableException, 
KafkaStorageException}
+import kafka.utils.{DelayedItem, TestUtils}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{EpochEndOffset, FetchResponse, 
FetchMetadata => JFetchMetadata}
+import org.apache.kafka.common.requests.FetchResponse.PartitionData
+import 
org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH_OFFSET, 
UNDEFINED_EPOCH}
+import org.apache.kafka.common.utils.SystemTime
+import org.easymock.EasyMock._
+import org.easymock.{Capture, CaptureType, EasyMock, IAnswer}
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+import scala.collection.Seq
+import scala.collection.{Map, mutable}
+
+class ReplicaAlterLogDirsThreadTest {
+
+  private val t1p0 = new TopicPartition("topic1", 0)
+  private val t1p1 = new TopicPartition("topic1", 1)
+
+  @Test
+  def issuesEpochRequestFromLocalReplica(): Unit = {
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
+
+    //Setup all dependencies
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val replica = createNiceMock(classOf[Replica])
+    val futureReplica = createNiceMock(classOf[Replica])
+    val partition = createMock(classOf[Partition])
+    val replicaManager = createMock(classOf[ReplicaManager])
+
+    val leaderEpoch = 2
+    val leo = 13
+
+    //Stubs
+    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn(leo).anyTimes()
+    stub(replica, replica, futureReplica, partition, replicaManager)
+
+    replay(leaderEpochs, replicaManager, replica)
+
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val thread = new ReplicaAlterLogDirsThread(
+      "alter-logs-dirs-thread-test1",
+      sourceBroker = endPoint,
+      brokerConfig = config,
+      replicaMgr = replicaManager,
+      quota = null,
+      brokerTopicStats = null)
+
+    val result = thread.fetchEpochsFromLeader(Map(t1p0 -> leaderEpoch, t1p1 -> 
leaderEpoch))
+
+    val expected = Map(
+      t1p0 -> new EpochEndOffset(Errors.NONE, leo),
+      t1p1 -> new EpochEndOffset(Errors.NONE, leo)
+    )
+
+    assertEquals("results from leader epoch request should have offset from 
local replica",
+                 expected, result)
+  }
+
+  @Test
+  def fetchEpochsFromLeaderShouldHandleExceptionFromGetLocalReplica(): Unit = {
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
+
+    //Setup all dependencies
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val replica = createNiceMock(classOf[Replica])
+    val partition = createMock(classOf[Partition])
+    val replicaManager = createMock(classOf[ReplicaManager])
+
+    val leaderEpoch = 2
+    val leo = 13
+
+    //Stubs
+    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn(leo).anyTimes()
+    
expect(replicaManager.getReplicaOrException(t1p0)).andReturn(replica).anyTimes()
+    
expect(replicaManager.getPartition(t1p0)).andReturn(Some(partition)).anyTimes()
+    expect(replicaManager.getReplicaOrException(t1p1)).andThrow(new 
KafkaStorageException).once()
+    
expect(replicaManager.getPartition(t1p1)).andReturn(Some(partition)).anyTimes()
+
+    replay(leaderEpochs, replicaManager, replica)
+
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val thread = new ReplicaAlterLogDirsThread(
+      "alter-logs-dirs-thread-test1",
+      sourceBroker = endPoint,
+      brokerConfig = config,
+      replicaMgr = replicaManager,
+      quota = null,
+      brokerTopicStats = null)
+
+    val result = thread.fetchEpochsFromLeader(Map(t1p0 -> leaderEpoch, t1p1 -> 
leaderEpoch))
+
+    val expected = Map(
+      t1p0 -> new EpochEndOffset(Errors.NONE, leo),
+      t1p1 -> new EpochEndOffset(Errors.KAFKA_STORAGE_ERROR, 
UNDEFINED_EPOCH_OFFSET)
+    )
+
+    assertEquals(expected, result)
+  }
+
+  @Test
+  def shouldTruncateToReplicaOffset(): Unit = {
+
+    //Create a capture to track what partitions/offsets are truncated
+    val truncateToCapture: Capture[Long] = newCapture(CaptureType.ALL)
+
+    // Setup all the dependencies
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
+    val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochsT1p0 = createMock(classOf[LeaderEpochCache])
+    val leaderEpochsT1p1 = createMock(classOf[LeaderEpochCache])
+    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
+    val logManager = createMock(classOf[LogManager])
+    val replicaT1p0 = createNiceMock(classOf[Replica])
+    val replicaT1p1 = createNiceMock(classOf[Replica])
+    // one future replica mock because our mocking methods return same values 
for both future replicas
+    val futureReplica = createNiceMock(classOf[Replica])
+    val partition = createMock(classOf[Partition])
+    val replicaManager = createMock(classOf[ReplicaManager])
+    val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => 
Unit]  = EasyMock.newCapture()
+
+    val leaderEpoch = 2
+    val futureReplicaLEO = 191
+    val replicaT1p0LEO = 190
+    val replicaT1p1LEO = 192
+
+    //Stubs
+    expect(partition.truncateTo(capture(truncateToCapture), 
anyBoolean())).anyTimes()
+    expect(replicaT1p0.epochs).andReturn(Some(leaderEpochsT1p0)).anyTimes()
+    expect(replicaT1p1.epochs).andReturn(Some(leaderEpochsT1p1)).anyTimes()
+    
expect(futureReplica.epochs).andReturn(Some(futureReplicaLeaderEpochs)).anyTimes()
+    expect(futureReplica.logEndOffset).andReturn(new 
LogOffsetMetadata(futureReplicaLEO)).anyTimes()
+    
expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(leaderEpoch).anyTimes()
+    
expect(leaderEpochsT1p0.endOffsetFor(leaderEpoch)).andReturn(replicaT1p0LEO).anyTimes()
+    
expect(leaderEpochsT1p1.endOffsetFor(leaderEpoch)).andReturn(replicaT1p1LEO).anyTimes()
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    stubWithFetchMessages(replicaT1p0, replicaT1p1, futureReplica, partition, 
replicaManager, responseCallback)
+
+    replay(leaderEpochsT1p0, leaderEpochsT1p1, futureReplicaLeaderEpochs, 
replicaManager,
+           logManager, quotaManager, replicaT1p0, replicaT1p1, futureReplica, 
partition)
+
+    //Create the thread
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val thread = new ReplicaAlterLogDirsThread(
+      "alter-logs-dirs-thread-test1",
+      sourceBroker = endPoint,
+      brokerConfig = config,
+      replicaMgr = replicaManager,
+      quota = quotaManager,
+      brokerTopicStats = null)
+    thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0))
+
+    //Run it
+    thread.doWork()
+
+    //We should have truncated to the offsets in the response
+    assertTrue(truncateToCapture.getValues.asScala.contains(replicaT1p0LEO))
+    assertTrue(truncateToCapture.getValues.asScala.contains(futureReplicaLEO))
+  }
+
+  @Test
+  def shouldTruncateToInitialFetchOffsetIfReplicaReturnsUndefinedOffset(): 
Unit = {
+
+    //Create a capture to track what partitions/offsets are truncated
+    val truncated: Capture[Long] = newCapture(CaptureType.ALL)
+
+    // Setup all the dependencies
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
+    val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val logManager = createMock(classOf[LogManager])
+    val replica = createNiceMock(classOf[Replica])
+    val futureReplica = createNiceMock(classOf[Replica])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
+    val partition = createMock(classOf[Partition])
+    val replicaManager = createMock(classOf[ReplicaManager])
+    val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => 
Unit]  = EasyMock.newCapture()
+
+    val initialFetchOffset = 100
+    val futureReplicaLEO = 111
+
+    //Stubs
+    expect(partition.truncateTo(capture(truncated), anyBoolean())).anyTimes()
+    expect(futureReplica.logEndOffset).andReturn(new 
LogOffsetMetadata(futureReplicaLEO)).anyTimes()
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+    
expect(futureReplica.epochs).andReturn(Some(futureReplicaLeaderEpochs)).anyTimes()
+
+    // pretend this is a completely new future replica, with no leader epochs 
recorded
+    
expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(UNDEFINED_EPOCH).anyTimes()
+
+    // since UNDEFINED_EPOCH is -1 wich will be lower than any valid leader 
epoch, the method
+    // will return UNDEFINED_EPOCH_OFFSET if requested epoch is lower than the 
first epoch cached
+    
expect(leaderEpochs.endOffsetFor(UNDEFINED_EPOCH)).andReturn(UNDEFINED_EPOCH_OFFSET).anyTimes()
+    stubWithFetchMessages(replica, replica, futureReplica, partition, 
replicaManager, responseCallback)
+    replay(replicaManager, logManager, quotaManager, leaderEpochs, 
futureReplicaLeaderEpochs,
+           replica, futureReplica, partition)
+
+    //Create the thread
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val thread = new ReplicaAlterLogDirsThread(
+      "alter-logs-dirs-thread-test1",
+      sourceBroker = endPoint,
+      brokerConfig = config,
+      replicaMgr = replicaManager,
+      quota = quotaManager,
+      brokerTopicStats = null)
+    thread.addPartitions(Map(t1p0 -> initialFetchOffset))
+
+    //Run it
+    thread.doWork()
+
+    //We should have truncated to initial fetch offset
+    assertEquals("Expected future replica to truncate to initial fetch offset 
if replica returns UNDEFINED_EPOCH_OFFSET",
+                 initialFetchOffset, truncated.getValue)
+  }
+
+  @Test
+  def shouldPollIndefinitelyIfReplicaNotAvailable(): Unit = {
+
+    //Create a capture to track what partitions/offsets are truncated
+    val truncated: Capture[Long] = newCapture(CaptureType.ALL)
+
+    // Setup all the dependencies
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
+    val quotaManager = 
createNiceMock(classOf[kafka.server.ReplicationQuotaManager])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
+    val logManager = createMock(classOf[kafka.log.LogManager])
+    val replica = createNiceMock(classOf[Replica])
+    val futureReplica = createNiceMock(classOf[Replica])
+    val partition = createMock(classOf[Partition])
+    val replicaManager = createMock(classOf[kafka.server.ReplicaManager])
+    val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => 
Unit]  = EasyMock.newCapture()
+
+    val futureReplicaLeaderEpoch = 1
+    val futureReplicaLEO = 290
+    val replicaLEO = 300
+
+    //Stubs
+    expect(partition.truncateTo(capture(truncated), anyBoolean())).anyTimes()
+    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+    
expect(futureReplica.epochs).andReturn(Some(futureReplicaLeaderEpochs)).anyTimes()
+
+    
expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(futureReplicaLeaderEpoch).anyTimes()
+    
expect(leaderEpochs.endOffsetFor(futureReplicaLeaderEpoch)).andReturn(replicaLEO).anyTimes()
+
+    expect(futureReplica.logEndOffset).andReturn(new 
LogOffsetMetadata(futureReplicaLEO)).anyTimes()
+    expect(replicaManager.getReplica(t1p0)).andReturn(Some(replica)).anyTimes()
+    expect(replicaManager.getReplica(t1p0, 
Request.FutureLocalReplicaId)).andReturn(Some(futureReplica)).anyTimes()
+    expect(replicaManager.getReplicaOrException(t1p0, 
Request.FutureLocalReplicaId)).andReturn(futureReplica).anyTimes()
+    // this will cause fetchEpochsFromLeader return an error with undefined 
offset
+    expect(replicaManager.getReplicaOrException(t1p0)).andThrow(new 
ReplicaNotAvailableException("")).times(3)
+    
expect(replicaManager.getReplicaOrException(t1p0)).andReturn(replica).once()
+    
expect(replicaManager.getPartition(t1p0)).andReturn(Some(partition)).anyTimes()
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    expect(replicaManager.fetchMessages(
+      EasyMock.anyLong(),
+      EasyMock.anyInt(),
+      EasyMock.anyInt(),
+      EasyMock.anyInt(),
+      EasyMock.anyObject(),
+      EasyMock.anyObject(),
+      EasyMock.anyObject(),
+      EasyMock.capture(responseCallback),
+      EasyMock.anyObject()))
+      .andAnswer(new IAnswer[Unit] {
+        override def answer(): Unit = {
+          responseCallback.getValue.apply(Seq.empty[(TopicPartition, 
FetchPartitionData)])
+        }
+      }).anyTimes()
+
+    replay(leaderEpochs, futureReplicaLeaderEpochs, replicaManager, 
logManager, quotaManager,
+           replica, futureReplica, partition)
+
+    //Create the thread
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val thread = new ReplicaAlterLogDirsThread(
+      "alter-logs-dirs-thread-test1",
+      sourceBroker = endPoint,
+      brokerConfig = config,
+      replicaMgr = replicaManager,
+      quota = quotaManager,
+      brokerTopicStats = null)
+    thread.addPartitions(Map(t1p0 -> 0))
+
+    // Run thread 3 times (exactly number of times we mock exception for 
getReplicaOrException)
+    (0 to 2).foreach { _ =>
+      thread.doWork()
+                     }
+
+    // Nothing happened since the replica was not available
+    assertEquals(0, truncated.getValues.size())
+
+    // Next time we loop, getReplicaOrException will return replica
+    thread.doWork()
+
+    // Now the final call should have actually done a truncation (to offset 
futureReplicaLEO)
+    assertEquals(futureReplicaLEO, truncated.getValue)
+  }
+
+  @Test
+  def shouldFetchLeaderEpochOnFirstFetchOnly(): Unit = {
+
+    //Setup all dependencies
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
+    val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
+    val logManager = createMock(classOf[LogManager])
+    val replica = createNiceMock(classOf[Replica])
+    val futureReplica = createNiceMock(classOf[Replica])
+    val partition = createMock(classOf[Partition])
+    val replicaManager = createMock(classOf[ReplicaManager])
+    val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => 
Unit]  = EasyMock.newCapture()
+
+    val leaderEpoch = 5
+    val futureReplicaLEO = 190
+    val replicaLEO = 213
+
+    //Stubs
+    expect(partition.truncateTo(futureReplicaLEO, true)).once()
+    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+    
expect(futureReplica.epochs).andReturn(Some(futureReplicaLeaderEpochs)).anyTimes()
+
+    expect(futureReplica.logEndOffset).andReturn(new 
LogOffsetMetadata(futureReplicaLEO)).anyTimes()
+    expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(leaderEpoch)
+    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn(replicaLEO)
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    stubWithFetchMessages(replica, replica, futureReplica, partition, 
replicaManager, responseCallback)
+
+    replay(leaderEpochs, futureReplicaLeaderEpochs, replicaManager, 
logManager, quotaManager,
+           replica, futureReplica, partition)
+
+    //Create the fetcher thread
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val thread = new ReplicaAlterLogDirsThread(
+      "alter-logs-dirs-thread-test1",
+      sourceBroker = endPoint,
+      brokerConfig = config,
+      replicaMgr = replicaManager,
+      quota = quotaManager,
+      brokerTopicStats = null)
+    thread.addPartitions(Map(t1p0 -> 0))
+
+    // loop few times
+    (0 to 3).foreach { _ =>
+      thread.doWork()
+                     }
+
+    //Assert that truncate to is called exactly once (despite more loops)
+    verify(partition)
+  }
+
+  @Test
+  def shouldFetchOneReplicaAtATime(): Unit = {
+
+    //Setup all dependencies
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
+    val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val logManager = createMock(classOf[LogManager])
+    val replica = createNiceMock(classOf[Replica])
+    val futureReplica = createNiceMock(classOf[Replica])
+    val partition = createMock(classOf[Partition])
+    val replicaManager = createMock(classOf[ReplicaManager])
+
+    //Stubs
+    expect(futureReplica.logStartOffset).andReturn(123).anyTimes()
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    stub(replica, replica, futureReplica, partition, replicaManager)
+
+    replay(replicaManager, logManager, quotaManager, replica, futureReplica, 
partition)
+
+    //Create the fetcher thread
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val thread = new ReplicaAlterLogDirsThread(
+      "alter-logs-dirs-thread-test1",
+      sourceBroker = endPoint,
+      brokerConfig = config,
+      replicaMgr = replicaManager,
+      quota = quotaManager,
+      brokerTopicStats = null)
+    thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0))
+
+    val ResultWithPartitions(fetchRequest, partitionsWithError) =
+      thread.buildFetchRequest(Seq((t1p0, new PartitionFetchState(150)), 
(t1p1, new PartitionFetchState(160))))
+
+    assertFalse(fetchRequest.isEmpty)
+    assertFalse(partitionsWithError.nonEmpty)
+    val request = fetchRequest.underlying.build()
+    assertEquals(0, request.minBytes)
+    val fetchInfos = request.fetchData.asScala.toSeq
+    assertEquals(1, fetchInfos.length)
+    assertEquals("Expected fetch request for largest partition", t1p1, 
fetchInfos.head._1)
+    assertEquals(160, fetchInfos.head._2.fetchOffset)
+  }
+
+  @Test
+  def shouldFetchNonDelayedAndNonTruncatingReplicas(): Unit = {
+
+    //Setup all dependencies
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
+    val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val logManager = createMock(classOf[LogManager])
+    val replica = createNiceMock(classOf[Replica])
+    val futureReplica = createNiceMock(classOf[Replica])
+    val partition = createMock(classOf[Partition])
+    val replicaManager = createMock(classOf[ReplicaManager])
+
+    //Stubs
+    expect(futureReplica.logStartOffset).andReturn(123).anyTimes()
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    stub(replica, replica, futureReplica, partition, replicaManager)
+
+    replay(replicaManager, logManager, quotaManager, replica, futureReplica, 
partition)
+
+    //Create the fetcher thread
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val thread = new ReplicaAlterLogDirsThread(
+      "alter-logs-dirs-thread-test1",
+      sourceBroker = endPoint,
+      brokerConfig = config,
+      replicaMgr = replicaManager,
+      quota = quotaManager,
+      brokerTopicStats = null)
+    thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0))
+
+    // one partition is ready and one is truncating
+    val ResultWithPartitions(fetchRequest, partitionsWithError) =
+      thread.buildFetchRequest(Seq(
+        (t1p0, new PartitionFetchState(150)),
+        (t1p1, new PartitionFetchState(160, truncatingLog=true))))
+
+    assertFalse(fetchRequest.isEmpty)
+    assertFalse(partitionsWithError.nonEmpty)
+    val fetchInfos = fetchRequest.underlying.build().fetchData.asScala.toSeq
+    assertEquals(1, fetchInfos.length)
+    assertEquals("Expected fetch request for non-truncating partition", t1p0, 
fetchInfos.head._1)
+    assertEquals(150, fetchInfos.head._2.fetchOffset)
+
+    // one partition is ready and one is delayed
+    val ResultWithPartitions(fetchRequest2, partitionsWithError2) =
+      thread.buildFetchRequest(Seq(
+        (t1p0, new PartitionFetchState(140)),
+        (t1p1, new PartitionFetchState(160, delay=new DelayedItem(5000)))))
+
+    assertFalse(fetchRequest2.isEmpty)
+    assertFalse(partitionsWithError2.nonEmpty)
+    val fetchInfos2 = fetchRequest2.underlying.build().fetchData.asScala.toSeq
+    assertEquals(1, fetchInfos2.length)
+    assertEquals("Expected fetch request for non-delayed partition", t1p0, 
fetchInfos2.head._1)
+    assertEquals(140, fetchInfos2.head._2.fetchOffset)
+
+    // both partitions are delayed
+    val ResultWithPartitions(fetchRequest3, partitionsWithError3) =
+      thread.buildFetchRequest(Seq(
+        (t1p0, new PartitionFetchState(140, delay=new DelayedItem(5000))),
+        (t1p1, new PartitionFetchState(160, delay=new DelayedItem(5000)))))
+    assertTrue("Expected no fetch requests since all partitions are delayed", 
fetchRequest3.isEmpty)
+    assertFalse(partitionsWithError3.nonEmpty)
+  }
+
+  def stub(replicaT1p0: Replica, replicaT1p1: Replica, futureReplica: Replica, 
partition: Partition, replicaManager: ReplicaManager) = {
+    
expect(replicaManager.getReplica(t1p0)).andReturn(Some(replicaT1p0)).anyTimes()
+    expect(replicaManager.getReplica(t1p0, 
Request.FutureLocalReplicaId)).andReturn(Some(futureReplica)).anyTimes()
+    
expect(replicaManager.getReplicaOrException(t1p0)).andReturn(replicaT1p0).anyTimes()
+    expect(replicaManager.getReplicaOrException(t1p0, 
Request.FutureLocalReplicaId)).andReturn(futureReplica).anyTimes()
+    
expect(replicaManager.getPartition(t1p0)).andReturn(Some(partition)).anyTimes()
+    
expect(replicaManager.getReplica(t1p1)).andReturn(Some(replicaT1p1)).anyTimes()
+    expect(replicaManager.getReplica(t1p1, 
Request.FutureLocalReplicaId)).andReturn(Some(futureReplica)).anyTimes()
+    
expect(replicaManager.getReplicaOrException(t1p1)).andReturn(replicaT1p1).anyTimes()
+    expect(replicaManager.getReplicaOrException(t1p1, 
Request.FutureLocalReplicaId)).andReturn(futureReplica).anyTimes()
+    
expect(replicaManager.getPartition(t1p1)).andReturn(Some(partition)).anyTimes()
+  }
+
+  def stubWithFetchMessages(replicaT1p0: Replica, replicaT1p1: Replica, 
futureReplica: Replica,
+                            partition: Partition, replicaManager: 
ReplicaManager,
+                            responseCallback: Capture[Seq[(TopicPartition, 
FetchPartitionData)] => Unit]) = {
+    stub(replicaT1p0, replicaT1p1, futureReplica, partition, replicaManager)
+    expect(replicaManager.fetchMessages(
+      EasyMock.anyLong(),
+      EasyMock.anyInt(),
+      EasyMock.anyInt(),
+      EasyMock.anyInt(),
+      EasyMock.anyObject(),
+      EasyMock.anyObject(),
+      EasyMock.anyObject(),
+      EasyMock.capture(responseCallback),
+      EasyMock.anyObject()))
+      .andAnswer(new IAnswer[Unit] {
+        override def answer(): Unit = {
+          responseCallback.getValue.apply(Seq.empty[(TopicPartition, 
FetchPartitionData)])
+        }
+      }).anyTimes()
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add unit test for ReplicaAlterLogDirsThread
> -------------------------------------------
>
>                 Key: KAFKA-6795
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6795
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Anna Povzner
>            Assignee: Anna Povzner
>            Priority: Major
>
> ReplicaAlterLogDirsThread was added as part of KIP-113 implementation, but 
> there is no unit test. 
> [~lindong] I assigned this to myself, since ideally I wanted to add unit 
> tests for KAFKA-6361 related changes (KIP-279), but feel free to re-assign. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to