This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new fbfda2c KAFKA-9731: Disable immediate fetch response for hw propagation if replica selector is not defined (#8607) fbfda2c is described below commit fbfda2c4ad889c731aa52b5214e0521f187f8db6 Author: Ismael Juma <ism...@juma.me.uk> AuthorDate: Mon May 4 21:38:53 2020 -0700 KAFKA-9731: Disable immediate fetch response for hw propagation if replica selector is not defined (#8607) In the case described in the JIRA, there was a 50%+ increase in the total fetch request rate in 2.4.0 due to this change. I included a few additional clean-ups: * Simplify `findPreferredReadReplica` and avoid unnecessary collection copies. * Use `LongSupplier` instead of `Supplier<Long>` in `SubscriptionState` to avoid unnecessary boxing. Added a unit test to ReplicaManagerTest and cleaned up the test class a bit including consistent usage of Time in MockTimer and other components. Reviewers: Gwen Shapira <g...@confluent.io>, David Arthur <mum...@gmail.com>, Jason Gustafson <ja...@confluent.io> --- .../consumer/internals/SubscriptionState.java | 8 +- .../main/scala/kafka/server/ReplicaManager.scala | 57 ++++----- .../unit/kafka/server/ReplicaManagerTest.scala | 135 +++++++++++++++------ .../scala/unit/kafka/utils/timer/MockTimer.scala | 3 +- gradle/spotbugs-exclude.xml | 7 ++ 5 files changed, 137 insertions(+), 73 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 6568c91..5b375da 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -39,8 +39,8 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.function.LongSupplier; import java.util.function.Predicate; -import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collector; import java.util.stream.Collectors; @@ -516,7 +516,7 @@ public class SubscriptionState { * @param preferredReadReplicaId The preferred read replica * @param timeMs The time at which this preferred replica is no longer valid */ - public synchronized void updatePreferredReadReplica(TopicPartition tp, int preferredReadReplicaId, Supplier<Long> timeMs) { + public synchronized void updatePreferredReadReplica(TopicPartition tp, int preferredReadReplicaId, LongSupplier timeMs) { assignedState(tp).updatePreferredReadReplica(preferredReadReplicaId, timeMs); } @@ -721,10 +721,10 @@ public class SubscriptionState { } } - private void updatePreferredReadReplica(int preferredReadReplica, Supplier<Long> timeMs) { + private void updatePreferredReadReplica(int preferredReadReplica, LongSupplier timeMs) { if (this.preferredReadReplica == null || preferredReadReplica != this.preferredReadReplica) { this.preferredReadReplica = preferredReadReplica; - this.preferredReadReplicaExpireTimeMs = timeMs.get(); + this.preferredReadReplicaExpireTimeMs = timeMs.getAsLong(); } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 5bdc3d2..b387785 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1053,7 +1053,7 @@ class ReplicaManager(val config: KafkaConfig, metadata => findPreferredReadReplica(partition, metadata, replicaId, fetchInfo.fetchOffset, fetchTimeMs)) if (preferredReadReplica.isDefined) { - replicaSelectorOpt.foreach{ selector => + replicaSelectorOpt.foreach { selector => debug(s"Replica selector ${selector.getClass.getSimpleName} returned preferred replica " + s"${preferredReadReplica.get} for $clientMetadata") } @@ -1079,9 +1079,9 @@ class ReplicaManager(val config: KafkaConfig, fetchOnlyFromLeader = fetchOnlyFromLeader, minOneMessage = minOneMessage) - // Check if the HW known to the follower is behind the actual HW - val followerNeedsHwUpdate: Boolean = partition.getReplica(replicaId) - .exists(replica => replica.lastSentHighWatermark < readInfo.highWatermark) + // Check if the HW known to the follower is behind the actual HW if a replica selector is defined + val followerNeedsHwUpdate = replicaSelectorOpt.isDefined && + partition.getReplica(replicaId).exists(replica => replica.lastSentHighWatermark < readInfo.highWatermark) val fetchDataInfo = if (shouldLeaderThrottle(quota, partition, replicaId)) { // If the partition is being throttled, simply return an empty set. @@ -1170,44 +1170,35 @@ class ReplicaManager(val config: KafkaConfig, replicaId: Int, fetchOffset: Long, currentTimeMs: Long): Option[Int] = { - if (partition.isLeader) { - if (Request.isValidBrokerId(replicaId)) { - // Don't look up preferred for follower fetches via normal replication - Option.empty - } else { + partition.leaderReplicaIdOpt.flatMap { leaderReplicaId => + // Don't look up preferred for follower fetches via normal replication + if (Request.isValidBrokerId(replicaId)) + None + else { replicaSelectorOpt.flatMap { replicaSelector => - val replicaEndpoints = metadataCache.getPartitionReplicaEndpoints(partition.topicPartition, new ListenerName(clientMetadata.listenerName)) - var replicaInfoSet: Set[ReplicaView] = partition.remoteReplicas + val replicaEndpoints = metadataCache.getPartitionReplicaEndpoints(partition.topicPartition, + new ListenerName(clientMetadata.listenerName)) + val replicaInfos = partition.remoteReplicas // Exclude replicas that don't have the requested offset (whether or not if they're in the ISR) - .filter(replica => replica.logEndOffset >= fetchOffset) - .filter(replica => replica.logStartOffset <= fetchOffset) + .filter(replica => replica.logEndOffset >= fetchOffset && replica.logStartOffset <= fetchOffset) .map(replica => new DefaultReplicaView( replicaEndpoints.getOrElse(replica.brokerId, Node.noNode()), replica.logEndOffset, currentTimeMs - replica.lastCaughtUpTimeMs)) - .toSet - - if (partition.leaderReplicaIdOpt.isDefined) { - val leaderReplica: ReplicaView = partition.leaderReplicaIdOpt - .map(replicaId => replicaEndpoints.getOrElse(replicaId, Node.noNode())) - .map(leaderNode => new DefaultReplicaView(leaderNode, partition.localLogOrException.logEndOffset, 0L)) - .get - replicaInfoSet ++= Set(leaderReplica) - - val partitionInfo = new DefaultPartitionView(replicaInfoSet.asJava, leaderReplica) - replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).asScala - .filter(!_.endpoint.isEmpty) - // Even though the replica selector can return the leader, we don't want to send it out with the - // FetchResponse, so we exclude it here - .filter(!_.equals(leaderReplica)) - .map(_.endpoint.id) - } else { - None + + val leaderReplica = new DefaultReplicaView( + replicaEndpoints.getOrElse(leaderReplicaId, Node.noNode()), + partition.localLogOrException.logEndOffset, 0L) + val replicaInfoSet = mutable.Set[ReplicaView]() ++= replicaInfos += leaderReplica + + val partitionInfo = new DefaultPartitionView(replicaInfoSet.asJava, leaderReplica) + replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).asScala.collect { + // Even though the replica selector can return the leader, we don't want to send it out with the + // FetchResponse, so we exclude it here + case selected if !selected.endpoint.isEmpty && selected != leaderReplica => selected.endpoint.id } } } - } else { - None } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 0ed04d5..09e4f14 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -221,7 +221,7 @@ class ReplicaManagerTest { } private[this] def testFencedErrorCausedByBecomeLeader(loopEpochChange: Int): Unit = { - val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time)) try { val brokerList = Seq[Integer](0, 1).asJava val topicPartition = new TopicPartition(topic, 0) @@ -277,7 +277,7 @@ class ReplicaManagerTest { @Test def testReceiveOutOfOrderSequenceExceptionWithLogStartOffset(): Unit = { - val timer = new MockTimer + val timer = new MockTimer(time) val replicaManager = setupReplicaManagerWithMockedPurgatories(timer) try { @@ -337,7 +337,7 @@ class ReplicaManagerTest { @Test def testReadCommittedFetchLimitedAtLSO(): Unit = { - val timer = new MockTimer + val timer = new MockTimer(time) val replicaManager = setupReplicaManagerWithMockedPurgatories(timer) try { @@ -444,7 +444,7 @@ class ReplicaManagerTest { @Test def testDelayedFetchIncludesAbortedTransactions(): Unit = { - val timer = new MockTimer + val timer = new MockTimer(time) val replicaManager = setupReplicaManagerWithMockedPurgatories(timer) try { @@ -521,7 +521,7 @@ class ReplicaManagerTest { @Test def testFetchBeyondHighWatermark(): Unit = { - val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds = Seq(0, 1, 2)) + val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2)) try { val brokerList = Seq[Integer](0, 1, 2).asJava @@ -579,7 +579,7 @@ class ReplicaManagerTest { val maxFetchBytes = 1024 * 1024 val aliveBrokersIds = Seq(0, 1) val leaderEpoch = 5 - val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokersIds) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokersIds) try { val tp = new TopicPartition(topic, 0) val replicas = aliveBrokersIds.toList.map(Int.box).asJava @@ -677,7 +677,7 @@ class ReplicaManagerTest { */ @Test def testFetchMessagesWhenNotFollowerForOnePartition(): Unit = { - val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds = Seq(0, 1, 2)) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2)) try { // Create 2 partitions, assign replica 0 as the leader for both a different follower (1 and 2) for each @@ -791,8 +791,9 @@ class ReplicaManagerTest { val countDownLatch = new CountDownLatch(1) // Prepare the mocked components for the test - val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager( - topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, expectTruncation = true) + val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(new MockTimer(time), + topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, + expectTruncation = true, localLogOffset = Some(10)) // Initialize partition state to follower, with leader = 1, leaderEpoch = 1 val tp = new TopicPartition(topic, topicPartition) @@ -830,7 +831,7 @@ class ReplicaManagerTest { val countDownLatch = new CountDownLatch(1) // Prepare the mocked components for the test - val (replicaManager, _) = prepareReplicaManagerAndLogManager( + val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time), topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, expectTruncation = true) @@ -863,7 +864,7 @@ class ReplicaManagerTest { val countDownLatch = new CountDownLatch(1) // Prepare the mocked components for the test - val (replicaManager, _) = prepareReplicaManagerAndLogManager( + val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time), topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, expectTruncation = true) @@ -912,7 +913,7 @@ class ReplicaManagerTest { val countDownLatch = new CountDownLatch(1) // Prepare the mocked components for the test - val (replicaManager, _) = prepareReplicaManagerAndLogManager( + val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time), topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, expectTruncation = true) @@ -951,6 +952,70 @@ class ReplicaManagerTest { assertFalse(consumerResult.assertFired.preferredReadReplica.isDefined) } + @Test + def testFollowerFetchWithDefaultSelectorNoForcedHwPropagation(): Unit = { + val topicPartition = 0 + val followerBrokerId = 0 + val leaderBrokerId = 1 + val leaderEpoch = 1 + val leaderEpochIncrement = 2 + val countDownLatch = new CountDownLatch(1) + val timer = new MockTimer(time) + + // Prepare the mocked components for the test + val (replicaManager, _) = prepareReplicaManagerAndLogManager(timer, + topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, + leaderBrokerId, countDownLatch, expectTruncation = true) + + val brokerList = Seq[Integer](0, 1).asJava + + val tp0 = new TopicPartition(topic, 0) + + replicaManager.createPartition(new TopicPartition(topic, 0)) + + // Make this replica the follower + val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(1) + .setIsr(brokerList) + .setZkVersion(0) + .setReplicas(brokerList) + .setIsNew(false)).asJava, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) + + val simpleRecords = Seq(new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)) + val appendResult = appendRecords(replicaManager, tp0, + MemoryRecords.withRecords(CompressionType.NONE, simpleRecords.toSeq: _*), AppendOrigin.Client) + + // Increment the hw in the leader by fetching from the last offset + val fetchOffset = simpleRecords.size + var followerResult = fetchAsFollower(replicaManager, tp0, + new PartitionData(fetchOffset, 0, 100000, Optional.empty()), + clientMetadata = None) + assertTrue(followerResult.isFired) + assertEquals(0, followerResult.assertFired.highWatermark) + + assertTrue("Expected producer request to be acked", appendResult.isFired) + + // Fetch from the same offset, no new data is expected and hence the fetch request should + // go to the purgatory + followerResult = fetchAsFollower(replicaManager, tp0, + new PartitionData(fetchOffset, 0, 100000, Optional.empty()), + clientMetadata = None, minBytes = 1000) + assertFalse("Request completed immediately unexpectedly", followerResult.isFired) + + // Complete the request in the purgatory by advancing the clock + timer.advanceClock(1001) + assertTrue(followerResult.isFired) + + assertEquals(fetchOffset, followerResult.assertFired.highWatermark) + } + @Test(expected = classOf[ClassNotFoundException]) def testUnknownReplicaSelector(): Unit = { val topicPartition = 0 @@ -962,7 +1027,7 @@ class ReplicaManagerTest { val props = new Properties() props.put(KafkaConfig.ReplicaSelectorClassProp, "non-a-class") - prepareReplicaManagerAndLogManager( + prepareReplicaManagerAndLogManager(new MockTimer(time), topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, expectTruncation = true, extraProps = props) } @@ -976,7 +1041,7 @@ class ReplicaManagerTest { val leaderEpochIncrement = 2 val countDownLatch = new CountDownLatch(1) - val (replicaManager, _) = prepareReplicaManagerAndLogManager( + val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time), topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, expectTruncation = true) assertFalse(replicaManager.replicaSelectorOpt.isDefined) @@ -984,7 +1049,7 @@ class ReplicaManagerTest { @Test def testFetchFollowerNotAllowedForOlderClients(): Unit = { - val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds = Seq(0, 1)) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1)) val tp0 = new TopicPartition(topic, 0) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) @@ -1022,7 +1087,7 @@ class ReplicaManagerTest { @Test def testFetchRequestRateMetrics(): Unit = { - val mockTimer = new MockTimer + val mockTimer = new MockTimer(time) val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) val tp0 = new TopicPartition(topic, 0) @@ -1067,7 +1132,7 @@ class ReplicaManagerTest { @Test def testBecomeFollowerWhileOldClientFetchInPurgatory(): Unit = { - val mockTimer = new MockTimer + val mockTimer = new MockTimer(time) val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) val tp0 = new TopicPartition(topic, 0) @@ -1115,7 +1180,7 @@ class ReplicaManagerTest { @Test def testBecomeFollowerWhileNewClientFetchInPurgatory(): Unit = { - val mockTimer = new MockTimer + val mockTimer = new MockTimer(time) val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) val tp0 = new TopicPartition(topic, 0) @@ -1164,7 +1229,7 @@ class ReplicaManagerTest { @Test def testFetchFromLeaderAlwaysAllowed(): Unit = { - val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds = Seq(0, 1)) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1)) val tp0 = new TopicPartition(topic, 0) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) @@ -1205,7 +1270,7 @@ class ReplicaManagerTest { // In this case, we should ensure that pending purgatory operations are cancelled // immediately rather than sitting around to timeout. - val mockTimer = new MockTimer + val mockTimer = new MockTimer(time) val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) val tp0 = new TopicPartition(topic, 0) @@ -1244,7 +1309,7 @@ class ReplicaManagerTest { @Test def testClearProducePurgatoryOnStopReplica(): Unit = { - val mockTimer = new MockTimer + val mockTimer = new MockTimer(time) val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) val tp0 = new TopicPartition(topic, 0) @@ -1329,22 +1394,22 @@ class ReplicaManagerTest { * ReplicaManager.becomeLeaderOrFollower() once with LeaderAndIsrRequest containing * 'leaderEpochInLeaderAndIsr' leader epoch for partition 'topicPartition'. */ - private def prepareReplicaManagerAndLogManager(topicPartition: Int, + private def prepareReplicaManagerAndLogManager(timer: MockTimer, + topicPartition: Int, leaderEpochInLeaderAndIsr: Int, followerBrokerId: Int, leaderBrokerId: Int, countDownLatch: CountDownLatch, expectTruncation: Boolean, + localLogOffset: Option[Long] = None, + offsetFromLeader: Long = 5, + leaderEpochFromLeader: Int = 3, extraProps: Properties = new Properties()) : (ReplicaManager, LogManager) = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) props.asScala ++= extraProps.asScala val config = KafkaConfig.fromProps(props) - // Setup mock local log to have leader epoch of 3 and offset of 10 - val localLogOffset = 10 - val offsetFromLeader = 5 - val leaderEpochFromLeader = 3 val mockScheduler = new MockScheduler(time) val mockBrokerTopicStats = new BrokerTopicStats val mockLogDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) @@ -1365,14 +1430,17 @@ class ReplicaManagerTest { override def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] = { assertEquals(leaderEpoch, leaderEpochFromLeader) - Some(OffsetAndEpoch(localLogOffset, leaderEpochFromLeader)) + localLogOffset.map { logOffset => + Some(OffsetAndEpoch(logOffset, leaderEpochFromLeader)) + }.getOrElse(super.endOffsetForEpoch(leaderEpoch)) } override def latestEpoch: Option[Int] = Some(leaderEpochFromLeader) - override def logEndOffsetMetadata = LogOffsetMetadata(localLogOffset) + override def logEndOffsetMetadata: LogOffsetMetadata = + localLogOffset.map(LogOffsetMetadata(_)).getOrElse(super.logEndOffsetMetadata) - override def logEndOffset: Long = localLogOffset + override def logEndOffset: Long = localLogOffset.getOrElse(super.logEndOffset) } // Expect to call LogManager.truncateTo exactly once @@ -1414,7 +1482,6 @@ class ReplicaManagerTest { .anyTimes() EasyMock.replay(metadataCache) - val timer = new MockTimer val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce]( purgatoryName = "Produce", timer, reaperEnabled = false) val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]( @@ -1822,7 +1889,7 @@ class ReplicaManagerTest { @Test def testStopReplicaWithStaleControllerEpoch(): Unit = { - val mockTimer = new MockTimer + val mockTimer = new MockTimer(time) val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) val tp0 = new TopicPartition(topic, 0) @@ -1848,7 +1915,7 @@ class ReplicaManagerTest { @Test def testStopReplicaWithOfflinePartition(): Unit = { - val mockTimer = new MockTimer + val mockTimer = new MockTimer(time) val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) val tp0 = new TopicPartition(topic, 0) @@ -1890,7 +1957,7 @@ class ReplicaManagerTest { } private def testStopReplicaWithInexistentPartition(deletePartitions: Boolean, throwIOException: Boolean): Unit = { - val mockTimer = new MockTimer + val mockTimer = new MockTimer(time) val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) val tp0 = new TopicPartition(topic, 0) @@ -1983,7 +2050,7 @@ class ReplicaManagerTest { deletePartition: Boolean, throwIOException: Boolean, expectedOutput: Errors): Unit = { - val mockTimer = new MockTimer + val mockTimer = new MockTimer(time) val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) val tp0 = new TopicPartition(topic, 0) diff --git a/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala b/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala index 8805b11..819954a 100644 --- a/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala +++ b/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala @@ -20,9 +20,8 @@ import kafka.utils.MockTime import scala.collection.mutable -class MockTimer extends Timer { +class MockTimer(val time: MockTime = new MockTime) extends Timer { - val time = new MockTime private val taskQueue = mutable.PriorityQueue[TimerTaskEntry]()(Ordering[TimerTaskEntry].reverse) def add(timerTask: TimerTask): Unit = { diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index 86cc464..6e9a6c1 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -156,6 +156,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read <Match> <!-- Uncallable anonymous methods are left behind after inlining by scalac 2.12, fixed in 2.13 --> + <Source name="ReplicaManager.scala"/> + <Package name="kafka.server"/> + <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/> + </Match> + + <Match> + <!-- Uncallable anonymous methods are left behind after inlining by scalac 2.12, fixed in 2.13 --> <Source name="LogManager.scala"/> <Package name="kafka.log"/> <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>