This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fbfda2c  KAFKA-9731: Disable immediate fetch response for hw 
propagation if replica selector is not defined (#8607)
fbfda2c is described below

commit fbfda2c4ad889c731aa52b5214e0521f187f8db6
Author: Ismael Juma <ism...@juma.me.uk>
AuthorDate: Mon May 4 21:38:53 2020 -0700

    KAFKA-9731: Disable immediate fetch response for hw propagation if replica 
selector is not defined (#8607)
    
    In the case described in the JIRA, there was a 50%+ increase in the total 
fetch request rate in
    2.4.0 due to this change.
    
    I included a few additional clean-ups:
    * Simplify `findPreferredReadReplica` and avoid unnecessary collection 
copies.
    * Use `LongSupplier` instead of `Supplier<Long>` in `SubscriptionState` to 
avoid unnecessary boxing.
    
    Added a unit test to ReplicaManagerTest and cleaned up the test class a bit 
including
    consistent usage of Time in MockTimer and other components.
    
    Reviewers: Gwen Shapira <g...@confluent.io>, David Arthur 
<mum...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../consumer/internals/SubscriptionState.java      |   8 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  57 ++++-----
 .../unit/kafka/server/ReplicaManagerTest.scala     | 135 +++++++++++++++------
 .../scala/unit/kafka/utils/timer/MockTimer.scala   |   3 +-
 gradle/spotbugs-exclude.xml                        |   7 ++
 5 files changed, 137 insertions(+), 73 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 6568c91..5b375da 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -39,8 +39,8 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.LongSupplier;
 import java.util.function.Predicate;
-import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collector;
 import java.util.stream.Collectors;
@@ -516,7 +516,7 @@ public class SubscriptionState {
      * @param preferredReadReplicaId The preferred read replica
      * @param timeMs The time at which this preferred replica is no longer 
valid
      */
-    public synchronized void updatePreferredReadReplica(TopicPartition tp, int 
preferredReadReplicaId, Supplier<Long> timeMs) {
+    public synchronized void updatePreferredReadReplica(TopicPartition tp, int 
preferredReadReplicaId, LongSupplier timeMs) {
         assignedState(tp).updatePreferredReadReplica(preferredReadReplicaId, 
timeMs);
     }
 
@@ -721,10 +721,10 @@ public class SubscriptionState {
             }
         }
 
-        private void updatePreferredReadReplica(int preferredReadReplica, 
Supplier<Long> timeMs) {
+        private void updatePreferredReadReplica(int preferredReadReplica, 
LongSupplier timeMs) {
             if (this.preferredReadReplica == null || preferredReadReplica != 
this.preferredReadReplica) {
                 this.preferredReadReplica = preferredReadReplica;
-                this.preferredReadReplicaExpireTimeMs = timeMs.get();
+                this.preferredReadReplicaExpireTimeMs = timeMs.getAsLong();
             }
         }
 
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 5bdc3d2..b387785 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1053,7 +1053,7 @@ class ReplicaManager(val config: KafkaConfig,
           metadata => findPreferredReadReplica(partition, metadata, replicaId, 
fetchInfo.fetchOffset, fetchTimeMs))
 
         if (preferredReadReplica.isDefined) {
-          replicaSelectorOpt.foreach{ selector =>
+          replicaSelectorOpt.foreach { selector =>
             debug(s"Replica selector ${selector.getClass.getSimpleName} 
returned preferred replica " +
               s"${preferredReadReplica.get} for $clientMetadata")
           }
@@ -1079,9 +1079,9 @@ class ReplicaManager(val config: KafkaConfig,
             fetchOnlyFromLeader = fetchOnlyFromLeader,
             minOneMessage = minOneMessage)
 
-          // Check if the HW known to the follower is behind the actual HW
-          val followerNeedsHwUpdate: Boolean = partition.getReplica(replicaId)
-            .exists(replica => replica.lastSentHighWatermark < 
readInfo.highWatermark)
+          // Check if the HW known to the follower is behind the actual HW if 
a replica selector is defined
+          val followerNeedsHwUpdate = replicaSelectorOpt.isDefined &&
+            partition.getReplica(replicaId).exists(replica => 
replica.lastSentHighWatermark < readInfo.highWatermark)
 
           val fetchDataInfo = if (shouldLeaderThrottle(quota, partition, 
replicaId)) {
             // If the partition is being throttled, simply return an empty set.
@@ -1170,44 +1170,35 @@ class ReplicaManager(val config: KafkaConfig,
                                replicaId: Int,
                                fetchOffset: Long,
                                currentTimeMs: Long): Option[Int] = {
-    if (partition.isLeader) {
-      if (Request.isValidBrokerId(replicaId)) {
-        // Don't look up preferred for follower fetches via normal replication
-        Option.empty
-      } else {
+    partition.leaderReplicaIdOpt.flatMap { leaderReplicaId =>
+      // Don't look up preferred for follower fetches via normal replication
+      if (Request.isValidBrokerId(replicaId))
+        None
+      else {
         replicaSelectorOpt.flatMap { replicaSelector =>
-          val replicaEndpoints = 
metadataCache.getPartitionReplicaEndpoints(partition.topicPartition, new 
ListenerName(clientMetadata.listenerName))
-          var replicaInfoSet: Set[ReplicaView] = partition.remoteReplicas
+          val replicaEndpoints = 
metadataCache.getPartitionReplicaEndpoints(partition.topicPartition,
+            new ListenerName(clientMetadata.listenerName))
+          val replicaInfos = partition.remoteReplicas
             // Exclude replicas that don't have the requested offset (whether 
or not if they're in the ISR)
-            .filter(replica => replica.logEndOffset >= fetchOffset)
-            .filter(replica => replica.logStartOffset <= fetchOffset)
+            .filter(replica => replica.logEndOffset >= fetchOffset && 
replica.logStartOffset <= fetchOffset)
             .map(replica => new DefaultReplicaView(
               replicaEndpoints.getOrElse(replica.brokerId, Node.noNode()),
               replica.logEndOffset,
               currentTimeMs - replica.lastCaughtUpTimeMs))
-            .toSet
-
-          if (partition.leaderReplicaIdOpt.isDefined) {
-            val leaderReplica: ReplicaView = partition.leaderReplicaIdOpt
-              .map(replicaId => replicaEndpoints.getOrElse(replicaId, 
Node.noNode()))
-              .map(leaderNode => new DefaultReplicaView(leaderNode, 
partition.localLogOrException.logEndOffset, 0L))
-              .get
-            replicaInfoSet ++= Set(leaderReplica)
-
-            val partitionInfo = new 
DefaultPartitionView(replicaInfoSet.asJava, leaderReplica)
-            replicaSelector.select(partition.topicPartition, clientMetadata, 
partitionInfo).asScala
-              .filter(!_.endpoint.isEmpty)
-              // Even though the replica selector can return the leader, we 
don't want to send it out with the
-              // FetchResponse, so we exclude it here
-              .filter(!_.equals(leaderReplica))
-              .map(_.endpoint.id)
-          } else {
-            None
+
+          val leaderReplica = new DefaultReplicaView(
+            replicaEndpoints.getOrElse(leaderReplicaId, Node.noNode()),
+            partition.localLogOrException.logEndOffset, 0L)
+          val replicaInfoSet = mutable.Set[ReplicaView]() ++= replicaInfos += 
leaderReplica
+
+          val partitionInfo = new DefaultPartitionView(replicaInfoSet.asJava, 
leaderReplica)
+          replicaSelector.select(partition.topicPartition, clientMetadata, 
partitionInfo).asScala.collect {
+            // Even though the replica selector can return the leader, we 
don't want to send it out with the
+            // FetchResponse, so we exclude it here
+            case selected if !selected.endpoint.isEmpty && selected != 
leaderReplica => selected.endpoint.id
           }
         }
       }
-    } else {
-      None
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 0ed04d5..09e4f14 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -221,7 +221,7 @@ class ReplicaManagerTest {
   }
 
   private[this] def testFencedErrorCausedByBecomeLeader(loopEpochChange: Int): 
Unit = {
-    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time))
     try {
       val brokerList = Seq[Integer](0, 1).asJava
       val topicPartition = new TopicPartition(topic, 0)
@@ -277,7 +277,7 @@ class ReplicaManagerTest {
 
   @Test
   def testReceiveOutOfOrderSequenceExceptionWithLogStartOffset(): Unit = {
-    val timer = new MockTimer
+    val timer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(timer)
 
     try {
@@ -337,7 +337,7 @@ class ReplicaManagerTest {
 
   @Test
   def testReadCommittedFetchLimitedAtLSO(): Unit = {
-    val timer = new MockTimer
+    val timer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(timer)
 
     try {
@@ -444,7 +444,7 @@ class ReplicaManagerTest {
 
   @Test
   def testDelayedFetchIncludesAbortedTransactions(): Unit = {
-    val timer = new MockTimer
+    val timer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(timer)
 
     try {
@@ -521,7 +521,7 @@ class ReplicaManagerTest {
 
   @Test
   def testFetchBeyondHighWatermark(): Unit = {
-    val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer, 
aliveBrokerIds = Seq(0, 1, 2))
+    val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), 
aliveBrokerIds = Seq(0, 1, 2))
     try {
       val brokerList = Seq[Integer](0, 1, 2).asJava
 
@@ -579,7 +579,7 @@ class ReplicaManagerTest {
     val maxFetchBytes = 1024 * 1024
     val aliveBrokersIds = Seq(0, 1)
     val leaderEpoch = 5
-    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer, aliveBrokersIds)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokersIds)
     try {
       val tp = new TopicPartition(topic, 0)
       val replicas = aliveBrokersIds.toList.map(Int.box).asJava
@@ -677,7 +677,7 @@ class ReplicaManagerTest {
    */
   @Test
   def testFetchMessagesWhenNotFollowerForOnePartition(): Unit = {
-    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer, aliveBrokerIds = Seq(0, 1, 2))
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2))
 
     try {
       // Create 2 partitions, assign replica 0 as the leader for both a 
different follower (1 and 2) for each
@@ -791,8 +791,9 @@ class ReplicaManagerTest {
     val countDownLatch = new CountDownLatch(1)
 
     // Prepare the mocked components for the test
-    val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
-      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, 
leaderBrokerId, countDownLatch, expectTruncation = true)
+    val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(new 
MockTimer(time),
+      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, 
leaderBrokerId, countDownLatch,
+      expectTruncation = true, localLogOffset = Some(10))
 
     // Initialize partition state to follower, with leader = 1, leaderEpoch = 1
     val tp = new TopicPartition(topic, topicPartition)
@@ -830,7 +831,7 @@ class ReplicaManagerTest {
     val countDownLatch = new CountDownLatch(1)
 
     // Prepare the mocked components for the test
-    val (replicaManager, _) = prepareReplicaManagerAndLogManager(
+    val (replicaManager, _) = prepareReplicaManagerAndLogManager(new 
MockTimer(time),
       topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
       leaderBrokerId, countDownLatch, expectTruncation = true)
 
@@ -863,7 +864,7 @@ class ReplicaManagerTest {
     val countDownLatch = new CountDownLatch(1)
 
     // Prepare the mocked components for the test
-    val (replicaManager, _) = prepareReplicaManagerAndLogManager(
+    val (replicaManager, _) = prepareReplicaManagerAndLogManager(new 
MockTimer(time),
       topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
       leaderBrokerId, countDownLatch, expectTruncation = true)
 
@@ -912,7 +913,7 @@ class ReplicaManagerTest {
     val countDownLatch = new CountDownLatch(1)
 
     // Prepare the mocked components for the test
-    val (replicaManager, _) = prepareReplicaManagerAndLogManager(
+    val (replicaManager, _) = prepareReplicaManagerAndLogManager(new 
MockTimer(time),
       topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
       leaderBrokerId, countDownLatch, expectTruncation = true)
 
@@ -951,6 +952,70 @@ class ReplicaManagerTest {
     assertFalse(consumerResult.assertFired.preferredReadReplica.isDefined)
   }
 
+  @Test
+  def testFollowerFetchWithDefaultSelectorNoForcedHwPropagation(): Unit = {
+    val topicPartition = 0
+    val followerBrokerId = 0
+    val leaderBrokerId = 1
+    val leaderEpoch = 1
+    val leaderEpochIncrement = 2
+    val countDownLatch = new CountDownLatch(1)
+    val timer = new MockTimer(time)
+
+    // Prepare the mocked components for the test
+    val (replicaManager, _) = prepareReplicaManagerAndLogManager(timer,
+      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
+      leaderBrokerId, countDownLatch, expectTruncation = true)
+
+    val brokerList = Seq[Integer](0, 1).asJava
+
+    val tp0 = new TopicPartition(topic, 0)
+
+    replicaManager.createPartition(new TopicPartition(topic, 0))
+
+    // Make this replica the follower
+    val leaderAndIsrRequest2 = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+      Seq(new LeaderAndIsrPartitionState()
+        .setTopicName(topic)
+        .setPartitionIndex(0)
+        .setControllerEpoch(0)
+        .setLeader(0)
+        .setLeaderEpoch(1)
+        .setIsr(brokerList)
+        .setZkVersion(0)
+        .setReplicas(brokerList)
+        .setIsNew(false)).asJava,
+      Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+    replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => 
())
+
+    val simpleRecords = Seq(new SimpleRecord("a".getBytes), new 
SimpleRecord("b".getBytes))
+    val appendResult = appendRecords(replicaManager, tp0,
+      MemoryRecords.withRecords(CompressionType.NONE, simpleRecords.toSeq: 
_*), AppendOrigin.Client)
+
+    // Increment the hw in the leader by fetching from the last offset
+    val fetchOffset = simpleRecords.size
+    var followerResult = fetchAsFollower(replicaManager, tp0,
+      new PartitionData(fetchOffset, 0, 100000, Optional.empty()),
+      clientMetadata = None)
+    assertTrue(followerResult.isFired)
+    assertEquals(0, followerResult.assertFired.highWatermark)
+
+    assertTrue("Expected producer request to be acked", appendResult.isFired)
+
+    // Fetch from the same offset, no new data is expected and hence the fetch 
request should
+    // go to the purgatory
+    followerResult = fetchAsFollower(replicaManager, tp0,
+      new PartitionData(fetchOffset, 0, 100000, Optional.empty()),
+      clientMetadata = None, minBytes = 1000)
+    assertFalse("Request completed immediately unexpectedly", 
followerResult.isFired)
+
+    // Complete the request in the purgatory by advancing the clock
+    timer.advanceClock(1001)
+    assertTrue(followerResult.isFired)
+
+    assertEquals(fetchOffset, followerResult.assertFired.highWatermark)
+  }
+
   @Test(expected = classOf[ClassNotFoundException])
   def testUnknownReplicaSelector(): Unit = {
     val topicPartition = 0
@@ -962,7 +1027,7 @@ class ReplicaManagerTest {
 
     val props = new Properties()
     props.put(KafkaConfig.ReplicaSelectorClassProp, "non-a-class")
-    prepareReplicaManagerAndLogManager(
+    prepareReplicaManagerAndLogManager(new MockTimer(time),
       topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
       leaderBrokerId, countDownLatch, expectTruncation = true, extraProps = 
props)
   }
@@ -976,7 +1041,7 @@ class ReplicaManagerTest {
     val leaderEpochIncrement = 2
     val countDownLatch = new CountDownLatch(1)
 
-    val (replicaManager, _) = prepareReplicaManagerAndLogManager(
+    val (replicaManager, _) = prepareReplicaManagerAndLogManager(new 
MockTimer(time),
       topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
       leaderBrokerId, countDownLatch, expectTruncation = true)
     assertFalse(replicaManager.replicaSelectorOpt.isDefined)
@@ -984,7 +1049,7 @@ class ReplicaManagerTest {
 
   @Test
   def testFetchFollowerNotAllowedForOlderClients(): Unit = {
-    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer, aliveBrokerIds = Seq(0, 1))
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1))
 
     val tp0 = new TopicPartition(topic, 0)
     val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
@@ -1022,7 +1087,7 @@ class ReplicaManagerTest {
 
   @Test
   def testFetchRequestRateMetrics(): Unit = {
-    val mockTimer = new MockTimer
+    val mockTimer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, 
aliveBrokerIds = Seq(0, 1))
 
     val tp0 = new TopicPartition(topic, 0)
@@ -1067,7 +1132,7 @@ class ReplicaManagerTest {
 
   @Test
   def testBecomeFollowerWhileOldClientFetchInPurgatory(): Unit = {
-    val mockTimer = new MockTimer
+    val mockTimer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, 
aliveBrokerIds = Seq(0, 1))
 
     val tp0 = new TopicPartition(topic, 0)
@@ -1115,7 +1180,7 @@ class ReplicaManagerTest {
 
   @Test
   def testBecomeFollowerWhileNewClientFetchInPurgatory(): Unit = {
-    val mockTimer = new MockTimer
+    val mockTimer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, 
aliveBrokerIds = Seq(0, 1))
 
     val tp0 = new TopicPartition(topic, 0)
@@ -1164,7 +1229,7 @@ class ReplicaManagerTest {
 
   @Test
   def testFetchFromLeaderAlwaysAllowed(): Unit = {
-    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer, aliveBrokerIds = Seq(0, 1))
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1))
 
     val tp0 = new TopicPartition(topic, 0)
     val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
@@ -1205,7 +1270,7 @@ class ReplicaManagerTest {
     // In this case, we should ensure that pending purgatory operations are 
cancelled
     // immediately rather than sitting around to timeout.
 
-    val mockTimer = new MockTimer
+    val mockTimer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, 
aliveBrokerIds = Seq(0, 1))
 
     val tp0 = new TopicPartition(topic, 0)
@@ -1244,7 +1309,7 @@ class ReplicaManagerTest {
 
   @Test
   def testClearProducePurgatoryOnStopReplica(): Unit = {
-    val mockTimer = new MockTimer
+    val mockTimer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, 
aliveBrokerIds = Seq(0, 1))
 
     val tp0 = new TopicPartition(topic, 0)
@@ -1329,22 +1394,22 @@ class ReplicaManagerTest {
    * ReplicaManager.becomeLeaderOrFollower() once with LeaderAndIsrRequest 
containing
    * 'leaderEpochInLeaderAndIsr' leader epoch for partition 'topicPartition'.
    */
-  private def prepareReplicaManagerAndLogManager(topicPartition: Int,
+  private def prepareReplicaManagerAndLogManager(timer: MockTimer,
+                                                 topicPartition: Int,
                                                  leaderEpochInLeaderAndIsr: 
Int,
                                                  followerBrokerId: Int,
                                                  leaderBrokerId: Int,
                                                  countDownLatch: 
CountDownLatch,
                                                  expectTruncation: Boolean,
+                                                 localLogOffset: Option[Long] 
= None,
+                                                 offsetFromLeader: Long = 5,
+                                                 leaderEpochFromLeader: Int = 
3,
                                                  extraProps: Properties = new 
Properties()) : (ReplicaManager, LogManager) = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     props.asScala ++= extraProps.asScala
     val config = KafkaConfig.fromProps(props)
 
-    // Setup mock local log to have leader epoch of 3 and offset of 10
-    val localLogOffset = 10
-    val offsetFromLeader = 5
-    val leaderEpochFromLeader = 3
     val mockScheduler = new MockScheduler(time)
     val mockBrokerTopicStats = new BrokerTopicStats
     val mockLogDirFailureChannel = new 
LogDirFailureChannel(config.logDirs.size)
@@ -1365,14 +1430,17 @@ class ReplicaManagerTest {
 
       override def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] 
= {
         assertEquals(leaderEpoch, leaderEpochFromLeader)
-        Some(OffsetAndEpoch(localLogOffset, leaderEpochFromLeader))
+        localLogOffset.map { logOffset =>
+          Some(OffsetAndEpoch(logOffset, leaderEpochFromLeader))
+        }.getOrElse(super.endOffsetForEpoch(leaderEpoch))
       }
 
       override def latestEpoch: Option[Int] = Some(leaderEpochFromLeader)
 
-      override def logEndOffsetMetadata = LogOffsetMetadata(localLogOffset)
+      override def logEndOffsetMetadata: LogOffsetMetadata =
+        
localLogOffset.map(LogOffsetMetadata(_)).getOrElse(super.logEndOffsetMetadata)
 
-      override def logEndOffset: Long = localLogOffset
+      override def logEndOffset: Long = 
localLogOffset.getOrElse(super.logEndOffset)
     }
 
     // Expect to call LogManager.truncateTo exactly once
@@ -1414,7 +1482,6 @@ class ReplicaManagerTest {
       .anyTimes()
     EasyMock.replay(metadataCache)
 
-    val timer = new MockTimer
     val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
       purgatoryName = "Produce", timer, reaperEnabled = false)
     val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](
@@ -1822,7 +1889,7 @@ class ReplicaManagerTest {
 
   @Test
   def testStopReplicaWithStaleControllerEpoch(): Unit = {
-    val mockTimer = new MockTimer
+    val mockTimer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, 
aliveBrokerIds = Seq(0, 1))
 
     val tp0 = new TopicPartition(topic, 0)
@@ -1848,7 +1915,7 @@ class ReplicaManagerTest {
 
   @Test
   def testStopReplicaWithOfflinePartition(): Unit = {
-    val mockTimer = new MockTimer
+    val mockTimer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, 
aliveBrokerIds = Seq(0, 1))
 
     val tp0 = new TopicPartition(topic, 0)
@@ -1890,7 +1957,7 @@ class ReplicaManagerTest {
   }
 
   private def testStopReplicaWithInexistentPartition(deletePartitions: 
Boolean, throwIOException: Boolean): Unit = {
-    val mockTimer = new MockTimer
+    val mockTimer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, 
aliveBrokerIds = Seq(0, 1))
 
     val tp0 = new TopicPartition(topic, 0)
@@ -1983,7 +2050,7 @@ class ReplicaManagerTest {
                                                    deletePartition: Boolean,
                                                    throwIOException: Boolean,
                                                    expectedOutput: Errors): 
Unit = {
-    val mockTimer = new MockTimer
+    val mockTimer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, 
aliveBrokerIds = Seq(0, 1))
 
     val tp0 = new TopicPartition(topic, 0)
diff --git a/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala 
b/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala
index 8805b11..819954a 100644
--- a/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala
+++ b/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala
@@ -20,9 +20,8 @@ import kafka.utils.MockTime
 
 import scala.collection.mutable
 
-class MockTimer extends Timer {
+class MockTimer(val time: MockTime = new MockTime) extends Timer {
 
-  val time = new MockTime
   private val taskQueue = 
mutable.PriorityQueue[TimerTaskEntry]()(Ordering[TimerTaskEntry].reverse)
 
   def add(timerTask: TimerTask): Unit = {
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 86cc464..6e9a6c1 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -156,6 +156,13 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
 
     <Match>
         <!-- Uncallable anonymous methods are left behind after inlining by 
scalac 2.12, fixed in 2.13 -->
+        <Source name="ReplicaManager.scala"/>
+        <Package name="kafka.server"/>
+        <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
+    </Match>
+
+    <Match>
+        <!-- Uncallable anonymous methods are left behind after inlining by 
scalac 2.12, fixed in 2.13 -->
         <Source name="LogManager.scala"/>
         <Package name="kafka.log"/>
         <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>

Reply via email to