Repository: kafka Updated Branches: refs/heads/trunk 018679410 -> 46aa88b9c
KAFKA-5244; Refactor BrokerTopicStats and ControllerStats so that they are classes This removes the need to force object initialisation via hacks to register the relevant Yammer metrics during start-up. It also works around issues caused by tests that delete JVM-wide singleton metrics (like `MetricsDuringTopicCreationDeletionTest`). Without this change, they would never be registered again. After this change, they will be registered again during KafkaServer start-up. It would be even better not to rely on JVM side singleton metrics (like we do for Kafka Metrics), but that's a bigger change that should be considered separately. Author: Ismael Juma <[email protected]> Reviewers: Rajini Sivaram <[email protected]> Closes #3059 from ijuma/kafka-5244-broker-static-stats-and-controller-stats-as-classes Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/46aa88b9 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/46aa88b9 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/46aa88b9 Branch: refs/heads/trunk Commit: 46aa88b9cf82971a0890f4f83efa9639f84c677b Parents: 0186794 Author: Ismael Juma <[email protected]> Authored: Mon May 15 22:35:17 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Mon May 15 22:35:17 2017 +0100 ---------------------------------------------------------------------- .../kafka/controller/KafkaController.scala | 24 ++-- .../controller/PartitionLeaderSelector.scala | 2 +- core/src/main/scala/kafka/log/Log.scala | 10 +- core/src/main/scala/kafka/log/LogManager.scala | 13 +- .../src/main/scala/kafka/server/KafkaApis.scala | 3 +- .../kafka/server/KafkaRequestHandler.scala | 32 +++-- .../main/scala/kafka/server/KafkaServer.scala | 28 ++-- .../kafka/server/ReplicaFetcherThread.scala | 4 +- .../scala/kafka/server/ReplicaManager.scala | 31 ++--- .../ReplicaFetcherThreadFatalErrorTest.scala | 2 +- .../test/scala/other/kafka/StressTestLog.scala | 4 +- .../other/kafka/TestLinearWriteSpeed.scala | 3 +- .../log/AbstractLogCleanerIntegrationTest.scala | 4 +- .../unit/kafka/log/BrokerCompressionTest.scala | 7 +- .../unit/kafka/log/LogCleanerManagerTest.scala | 7 +- .../scala/unit/kafka/log/LogCleanerTest.scala | 4 +- .../src/test/scala/unit/kafka/log/LogTest.scala | 128 +++++++++++++------ .../scala/unit/kafka/metrics/MetricsTest.scala | 2 +- .../server/HighwatermarkPersistenceTest.scala | 4 +- .../unit/kafka/server/ISRExpirationTest.scala | 3 +- .../kafka/server/ReplicaManagerQuotasTest.scala | 3 +- .../unit/kafka/server/ReplicaManagerTest.scala | 18 ++- .../unit/kafka/server/SimpleFetchTest.scala | 12 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 3 +- 24 files changed, 221 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index be4dd53..41a88d9 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -42,7 +42,10 @@ import scala.collection._ import scala.util.Try class ControllerContext(val zkUtils: ZkUtils) { + val controllerStats = new ControllerStats + var controllerChannelManager: ControllerChannelManager = null + var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty var epoch: Int = KafkaController.InitialControllerEpoch - 1 var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1 @@ -150,10 +153,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState val controllerContext = new ControllerContext(zkUtils) val partitionStateMachine = new PartitionStateMachine(this) val replicaStateMachine = new ReplicaStateMachine(this) + // have a separate scheduler for the controller to be able to start and stop independently of the // kafka server private val kafkaScheduler = new KafkaScheduler(1) - val topicDeletionManager: TopicDeletionManager = new TopicDeletionManager(this) + + val topicDeletionManager = new TopicDeletionManager(this) val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config) private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext) private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext) @@ -162,6 +167,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState private val controllerEventQueue = new LinkedBlockingQueue[ControllerEvent] private val controllerEventThread = new ControllerEventThread("controller-event-thread") + private val brokerChangeListener = new BrokerChangeListener(this) private val topicChangeListener = new TopicChangeListener(this) private val topicDeletionListener = new TopicDeletionListener(this) @@ -169,6 +175,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState private val partitionReassignmentListener = new PartitionReassignmentListener(this) private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this) private val isrChangeNotificationListener = new IsrChangeNotificationListener(this) + private val activeControllerId = new AtomicInteger(-1) private val offlinePartitionCount = new AtomicInteger(0) private val preferredReplicaImbalanceCount = new AtomicInteger(0) @@ -1155,7 +1162,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState case class BrokerChange(currentBrokerList: Seq[String]) extends ControllerEvent { override def process(): Unit = { if (!isActive) return - ControllerStats.leaderElectionTimer.time { + controllerContext.controllerStats.leaderElectionTimer.time { try { val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo) val curBrokerIds = curBrokers.map(_.id) @@ -1713,16 +1720,9 @@ case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpo } } -object ControllerStats extends KafkaMetricsGroup { - - private val _uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS) - private val _leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) - - // KafkaServer needs to initialize controller metrics during startup. We perform initialization - // through method calls to avoid Scala compiler warnings. - def uncleanLeaderElectionRate: Meter = _uncleanLeaderElectionRate - - def leaderElectionTimer: KafkaTimer = _leaderElectionTimer +private[controller] class ControllerStats extends KafkaMetricsGroup { + val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS) + val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) } sealed trait ControllerEvent { http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index d1799a6..42db26b 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -74,7 +74,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi throw new NoReplicaOnlineException(s"No replica for partition $topicAndPartition is alive. Live " + s"brokers are: [${controllerContext.liveBrokerIds}]. Assigned replicas are: [$assignedReplicas].") } else { - ControllerStats.uncleanLeaderElectionRate.mark() + controllerContext.controllerStats.uncleanLeaderElectionRate.mark() val newLeader = liveAssignedReplicas.head warn(s"No broker in ISR is alive for $topicAndPartition. Elect leader $newLeader from live " + s"brokers ${liveAssignedReplicas.mkString(",")}. There's potential data loss.") http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index a4796d1..e2d9489 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -113,6 +113,7 @@ case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, i * Other activities such as log cleaning are not affected by logStartOffset. * @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk * @param scheduler The thread pool scheduler used for background actions + * @param brokerTopicStats Container for Broker Topic Yammer Metrics * @param time The time instance used for checking the clock * @param maxProducerIdExpirationMs The maximum amount of time to wait before a producer id is considered expired * @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired @@ -123,6 +124,7 @@ class Log(@volatile var dir: File, @volatile var logStartOffset: Long = 0L, @volatile var recoveryPoint: Long = 0L, scheduler: Scheduler, + brokerTopicStats: BrokerTopicStats, time: Time = Time.SYSTEM, val maxProducerIdExpirationMs: Int = 60 * 60 * 1000, val producerIdExpirationCheckIntervalMs: Int = 10 * 60 * 1000) extends Logging with KafkaMetricsGroup { @@ -560,8 +562,8 @@ class Log(@volatile var dir: File, if (batch.sizeInBytes > config.maxMessageSize) { // we record the original message set size instead of the trimmed size // to be consistent with pre-compression bytesRejectedRate recording - BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) + brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) + brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) throw new RecordTooLargeException("Message batch size is %d bytes which exceeds the maximum configured size of %d." .format(batch.sizeInBytes, config.maxMessageSize)) } @@ -746,8 +748,8 @@ class Log(@volatile var dir: File, // Check if the message sizes are valid. val batchSize = batch.sizeInBytes if (batchSize > config.maxMessageSize) { - BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) + brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) + brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) throw new RecordTooLargeException(s"The record batch size is $batchSize bytes which exceeds the maximum configured " + s"value of ${config.maxMessageSize}.") } http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/main/scala/kafka/log/LogManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 4ce4716..0b094df 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -55,6 +55,7 @@ class LogManager(val logDirs: Array[File], val maxPidExpirationMs: Int, scheduler: Scheduler, val brokerState: BrokerState, + brokerTopicStats: BrokerTopicStats, time: Time) extends Logging { val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint" @@ -175,7 +176,8 @@ class LogManager(val logDirs: Array[File], recoveryPoint = logRecoveryPoint, maxProducerIdExpirationMs = maxPidExpirationMs, scheduler = scheduler, - time = time) + time = time, + brokerTopicStats = brokerTopicStats) if (logDir.getName.endsWith(Log.DeleteDirSuffix)) { this.logsToBeDeleted.add(current) } else { @@ -416,7 +418,8 @@ class LogManager(val logDirs: Array[File], recoveryPoint = 0L, maxProducerIdExpirationMs = maxPidExpirationMs, scheduler = scheduler, - time = time) + time = time, + brokerTopicStats = brokerTopicStats) logs.put(topicPartition, log) info("Created log for partition [%s,%d] in %s with properties {%s}." .format(topicPartition.topic, @@ -572,7 +575,8 @@ object LogManager { zkUtils: ZkUtils, brokerState: BrokerState, kafkaScheduler: KafkaScheduler, - time: Time): LogManager = { + time: Time, + brokerTopicStats: BrokerTopicStats): LogManager = { val defaultProps = KafkaServer.copyKafkaConfigToLog(config) val defaultLogConfig = LogConfig(defaultProps) @@ -602,6 +606,7 @@ object LogManager { maxPidExpirationMs = config.transactionIdExpirationMs, scheduler = kafkaScheduler, brokerState = brokerState, - time = time) + time = time, + brokerTopicStats = brokerTopicStats) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 5f1a2d5..c746365 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -70,6 +70,7 @@ class KafkaApis(val requestChannel: RequestChannel, val metrics: Metrics, val authorizer: Option[Authorizer], val quotas: QuotaManagers, + brokerTopicStats: BrokerTopicStats, val clusterId: String, time: Time) extends Logging { @@ -516,7 +517,7 @@ class KafkaApis(val requestChannel: RequestChannel, fetchedPartitionData.put(topicPartition, data) // record the bytes out metrics only when the response is being sent - BrokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes) + brokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes) } val response = new FetchResponse(fetchedPartitionData, 0) http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/main/scala/kafka/server/KafkaRequestHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index 8dfbe64..9983e3d 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -107,7 +107,7 @@ class KafkaRequestHandlerPool(val brokerId: Int, class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup { val tags: scala.collection.Map[String, String] = name match { - case None => scala.collection.Map.empty + case None => Map.empty case Some(topic) => Map("topic" -> topic) } @@ -142,7 +142,7 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup { } } -object BrokerTopicStats extends Logging { +object BrokerTopicStats { val MessagesInPerSec = "MessagesInPerSec" val BytesInPerSec = "BytesInPerSec" val BytesOutPerSec = "BytesOutPerSec" @@ -153,25 +153,26 @@ object BrokerTopicStats extends Logging { val FailedFetchRequestsPerSec = "FailedFetchRequestsPerSec" val TotalProduceRequestsPerSec = "TotalProduceRequestsPerSec" val TotalFetchRequestsPerSec = "TotalFetchRequestsPerSec" - private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k)) - private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory)) - private val allTopicsStats = new BrokerTopicMetrics(None) +} - def getBrokerAllTopicsStats(): BrokerTopicMetrics = allTopicsStats +class BrokerTopicStats { + import BrokerTopicStats._ + + private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory)) + val allTopicsStats = new BrokerTopicMetrics(None) - def getBrokerTopicStats(topic: String): BrokerTopicMetrics = { + def topicStats(topic: String): BrokerTopicMetrics = stats.getAndMaybePut(topic) - } def updateReplicationBytesIn(value: Long) { - getBrokerAllTopicsStats.replicationBytesInRate.foreach { metric => + allTopicsStats.replicationBytesInRate.foreach { metric => metric.mark(value) } } private def updateReplicationBytesOut(value: Long) { - getBrokerAllTopicsStats.replicationBytesOutRate.foreach { metric => + allTopicsStats.replicationBytesOutRate.foreach { metric => metric.mark(value) } } @@ -186,8 +187,15 @@ object BrokerTopicStats extends Logging { if (isFollower) { updateReplicationBytesOut(value) } else { - getBrokerTopicStats(topic).bytesOutRate.mark(value) - getBrokerAllTopicsStats.bytesOutRate.mark(value) + topicStats(topic).bytesOutRate.mark(value) + allTopicsStats.bytesOutRate.mark(value) } } + + + def close(): Unit = { + allTopicsStats.close() + stats.values.foreach(_.close()) + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index fdf837c..788f718 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -139,9 +139,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap private var _clusterId: String = null + private var _brokerTopicStats: BrokerTopicStats = null def clusterId: String = _clusterId + private[kafka] def brokerTopicStats = _brokerTopicStats + newGauge( "BrokerState", new Gauge[Int] { @@ -204,11 +207,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP val metricConfig = KafkaServer.metricConfig(config) metrics = new Metrics(metricConfig, reporters, time, true) + /* register broker metrics */ + _brokerTopicStats = new BrokerTopicStats + quotaManagers = QuotaFactory.instantiate(config, metrics, time) notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala) /* start log manager */ - logManager = LogManager(config, zkUtils, brokerState, kafkaScheduler, time) + logManager = LogManager(config, zkUtils, brokerState, kafkaScheduler, time, brokerTopicStats) logManager.startup() metadataCache = new MetadataCache(config.brokerId) @@ -246,7 +252,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP /* start processing requests */ apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator, - kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, clusterId, time) + kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, + brokerTopicStats, clusterId, time) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time, config.numIoThreads) @@ -277,9 +284,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it checkpointBrokerId(config.brokerId) - /* register broker metrics */ - registerStats() - brokerState.newState(RunningAsBroker) shutdownLatch = new CountDownLatch(1) startupComplete.set(true) @@ -304,7 +308,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP } protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = - new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers.follower, metadataCache) + new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers.follower, + brokerTopicStats, metadataCache) private def initZk(): ZkUtils = { info(s"Connecting to zookeeper on ${config.zkConnect}") @@ -345,15 +350,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP } /** - * Forces some dynamic jmx beans to be registered on server startup. - */ - private def registerStats() { - BrokerTopicStats.getBrokerAllTopicsStats() - ControllerStats.uncleanLeaderElectionRate - ControllerStats.leaderElectionTimer - } - - /** * Performs controlled shutdown */ private def controlledShutdown() { @@ -620,6 +616,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP if (metrics != null) CoreUtils.swallow(metrics.close()) + if (brokerTopicStats != null) + CoreUtils.swallow(brokerTopicStats.close()) brokerState.newState(NotRunning) http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 1148e92..bd678b3 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -112,7 +112,7 @@ class ReplicaFetcherThread(name: String, trace(s"Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark") if (quota.isThrottled(topicPartition)) quota.record(records.sizeInBytes) - BrokerTopicStats.updateReplicationBytesIn(records.sizeInBytes) + replicaMgr.brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes) } catch { case e: KafkaStorageException => fatal(s"Disk error while replicating data for $topicPartition", e) @@ -368,4 +368,4 @@ object ReplicaFetcherThread { case e => Some(e.exception) } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index b3d1d32..eec9ab2 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -131,6 +131,7 @@ class ReplicaManager(val config: KafkaConfig, val logManager: LogManager, val isShuttingDown: AtomicBoolean, quotaManager: ReplicationQuotaManager, + val brokerTopicStats: BrokerTopicStats, val metadataCache: MetadataCache, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup { /* epoch of the controller that last changed the leader */ @@ -264,7 +265,7 @@ class ReplicaManager(val config: KafkaConfig, removedPartition.delete() // this will delete the local log val topicHasPartitions = allPartitions.keys.exists(tp => topicPartition.topic == tp.topic) if (!topicHasPartitions) - BrokerTopicStats.removeMetrics(topicPartition.topic) + brokerTopicStats.removeMetrics(topicPartition.topic) } } case None => @@ -503,8 +504,8 @@ class ReplicaManager(val config: KafkaConfig, requiredAcks: Short): Map[TopicPartition, LogAppendResult] = { trace("Append [%s] to local log ".format(entriesPerPartition)) entriesPerPartition.map { case (topicPartition, records) => - BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).totalProduceRequestRate.mark() - BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.mark() + brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark() + brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark() // reject appending to internal topics if it is not allowed if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) { @@ -529,10 +530,10 @@ class ReplicaManager(val config: KafkaConfig, info.lastOffset - info.firstOffset + 1 // update stats for successfully appended bytes and messages as bytesInRate and messageInRate - BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(records.sizeInBytes) - BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages) - BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages) + brokerTopicStats.topicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes) + brokerTopicStats.allTopicsStats.bytesInRate.mark(records.sizeInBytes) + brokerTopicStats.topicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages) + brokerTopicStats.allTopicsStats.messagesInRate.mark(numAppendedMessages) trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" .format(records.sizeInBytes, topicPartition.topic, topicPartition.partition, info.firstOffset, info.lastOffset)) @@ -552,8 +553,8 @@ class ReplicaManager(val config: KafkaConfig, _: InvalidTimestampException) => (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) case t: Throwable => - BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).failedProduceRequestRate.mark() - BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() + brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark() + brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark() error("Error processing append operation on partition %s".format(topicPartition), t) (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(t))) } @@ -649,8 +650,8 @@ class ReplicaManager(val config: KafkaConfig, val partitionFetchSize = fetchInfo.maxBytes val followerLogStartOffset = fetchInfo.logStartOffset - BrokerTopicStats.getBrokerTopicStats(tp.topic).totalFetchRequestRate.mark() - BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.mark() + brokerTopicStats.topicStats(tp.topic).totalFetchRequestRate.mark() + brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark() try { trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " + @@ -726,8 +727,8 @@ class ReplicaManager(val config: KafkaConfig, readSize = partitionFetchSize, exception = Some(e)) case e: Throwable => - BrokerTopicStats.getBrokerTopicStats(tp.topic).failedFetchRequestRate.mark() - BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark() + brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark() + brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark() error(s"Error processing fetch operation on partition $tp, offset $offset", e) LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), hw = -1L, @@ -1119,8 +1120,8 @@ object OffsetsForLeaderEpoch extends Logging { val offset = try { new EpochEndOffset(NONE, replicaManager.getLeaderReplicaIfLocal(tp).epochs.get.endOffsetFor(epoch)) } catch { - case e: NotLeaderForPartitionException => new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET) - case e: UnknownTopicOrPartitionException => new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET) + case _: NotLeaderForPartitionException => new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET) + case _: UnknownTopicOrPartitionException => new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET) } (tp, offset) }.toMap.asJava http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala index 9a04e67..cd0c74b 100644 --- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala +++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala @@ -111,7 +111,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness { override def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = { new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, - quotaManagers.follower, metadataCache) { + quotaManagers.follower, new BrokerTopicStats, metadataCache) { override protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String], quotaManager: ReplicationQuotaManager) = http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/other/kafka/StressTestLog.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index 806702d..7583d45 100755 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -21,6 +21,7 @@ import java.util.Properties import java.util.concurrent.atomic._ import kafka.log._ +import kafka.server.BrokerTopicStats import kafka.utils._ import org.apache.kafka.clients.consumer.OffsetOutOfRangeException import org.apache.kafka.common.record.FileRecords @@ -46,7 +47,8 @@ object StressTestLog { logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - time = time) + time = time, + brokerTopicStats = new BrokerTopicStats) val writer = new WriterThread(log) writer.start() val reader = new ReaderThread(log) http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala index 69152e3..658e3a0 100755 --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -25,6 +25,7 @@ import java.util.{Properties, Random} import joptsimple._ import kafka.log._ import kafka.message._ +import kafka.server.BrokerTopicStats import kafka.utils._ import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{Time, Utils} @@ -206,7 +207,7 @@ object TestLinearWriteSpeed { class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: MemoryRecords) extends Writable { Utils.delete(dir) - val log = new Log(dir, config, 0L, 0L, scheduler, Time.SYSTEM) + val log = new Log(dir, config, 0L, 0L, scheduler, new BrokerTopicStats, Time.SYSTEM) def write(): Int = { log.appendAsLeader(messages, leaderEpoch = 0) messages.sizeInBytes http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala index 3c9ddd1..f796042 100644 --- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala @@ -20,6 +20,7 @@ import java.io.File import java.nio.file.Files import java.util.Properties +import kafka.server.BrokerTopicStats import kafka.utils.{MockTime, Pool, TestUtils} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.utils.Utils @@ -95,7 +96,8 @@ abstract class AbstractLogCleanerIntegrationTest { logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - time = time) + time = time, + brokerTopicStats = new BrokerTopicStats) logMap.put(partition, log) this.logs += log } http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala index a42ae22..ee46341 100755 --- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala @@ -25,10 +25,12 @@ import org.junit.Assert._ import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.runners.Parameterized.Parameters -import org.apache.kafka.common.record.{SimpleRecord, CompressionType, MemoryRecords} +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} import org.apache.kafka.common.utils.Utils import java.util.{Collection, Properties} +import kafka.server.BrokerTopicStats + import scala.collection.JavaConverters._ @RunWith(value = classOf[Parameterized]) @@ -53,7 +55,8 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin val logProps = new Properties() logProps.put(LogConfig.CompressionTypeProp, brokerCompression) /*configure broker-side compression */ - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + time = time, brokerTopicStats = new BrokerTopicStats) /* append two messages */ log.appendAsLeader(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec), 0, http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 9f9d982..d577f02 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -20,6 +20,7 @@ package kafka.log import java.io.File import java.util.Properties +import kafka.server.BrokerTopicStats import kafka.utils._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record._ @@ -234,12 +235,14 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - time = time) + time = time, + brokerTopicStats = new BrokerTopicStats) log } private def makeLog(dir: File = logDir, config: LogConfig = logConfig) = - new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + time = time, brokerTopicStats = new BrokerTopicStats) private def records(key: Int, value: Int, timestamp: Long) = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(timestamp, key.toString.getBytes, value.toString.getBytes)) http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index c842fc3..704aa73 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -23,6 +23,7 @@ import java.nio.file.Paths import java.util.Properties import kafka.common._ +import kafka.server.BrokerTopicStats import kafka.utils._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record._ @@ -974,7 +975,8 @@ class LogCleanerTest extends JUnitSuite { messageWithOffset(key.toString.getBytes, value.toString.getBytes, offset) private def makeLog(dir: File = dir, config: LogConfig = logConfig) = - new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + time = time, brokerTopicStats = new BrokerTopicStats) private def noOpCheckDone(topicPartition: TopicPartition) { /* do nothing */ } http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index e545255..d7ca029 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -27,7 +27,7 @@ import kafka.common.KafkaException import org.junit.Assert._ import org.junit.{After, Before, Test} import kafka.utils._ -import kafka.server.KafkaConfig +import kafka.server.{BrokerTopicStats, KafkaConfig} import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} import org.apache.kafka.common.record.MemoryRecords.RecordFilter import org.apache.kafka.common.record._ @@ -45,6 +45,7 @@ class LogTest { val time = new MockTime() var config: KafkaConfig = null val logConfig = LogConfig() + val brokerTopicStats = new BrokerTopicStats @Before def setUp() { @@ -54,6 +55,7 @@ class LogTest { @After def tearDown() { + brokerTopicStats.close() Utils.delete(tmpDir) } @@ -100,6 +102,7 @@ class LogTest { recoveryPoint = 0L, maxProducerIdExpirationMs = 24 * 60, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments) // Test the segment rolling behavior when messages do not have a timestamp. @@ -151,7 +154,8 @@ class LogTest { LogConfig(logProps), recoveryPoint = 0L, scheduler = time.scheduler, - time = time) + time = time, + brokerTopicStats = new BrokerTopicStats) val pid = 1L val epoch: Short = 0 @@ -410,6 +414,7 @@ class LogTest { LogConfig(logProps), recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) val pid = 1L @@ -485,6 +490,7 @@ class LogTest { LogConfig(logProps), recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) val epoch: Short = 0 @@ -601,6 +607,7 @@ class LogTest { LogConfig(logProps), recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) val pid = 1L @@ -632,6 +639,7 @@ class LogTest { logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments) log.appendAsLeader(set, leaderEpoch = 0) @@ -661,7 +669,8 @@ class LogTest { // We use need to use magic value 1 here because the test is message size sensitive. logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString) // create a log - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) // segments expire in size @@ -676,7 +685,8 @@ class LogTest { @Test def testLoadEmptyLog() { createEmptyLogs(logDir, 0) - val log = new Log(logDir, logConfig, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, logConfig, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds), leaderEpoch = 0) } @@ -689,7 +699,8 @@ class LogTest { logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer) // We use need to use magic value 1 here because the test is message size sensitive. logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray for(value <- values) @@ -713,7 +724,8 @@ class LogTest { def testAppendAndReadWithNonSequentialOffsets() { val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes)) @@ -738,7 +750,8 @@ class LogTest { def testReadAtLogGap() { val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) // keep appending until we have two segments with only a single message in the second segment while(log.numberOfSegments == 1) @@ -755,7 +768,8 @@ class LogTest { def testReadWithMinMessage() { val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes)) @@ -783,7 +797,8 @@ class LogTest { def testReadWithTooSmallMaxLength() { val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes)) @@ -819,7 +834,8 @@ class LogTest { // set up replica log starting with offset 1024 and with one message (at offset 1024) logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) log.appendAsLeader(TestUtils.singletonRecords(value = "42".getBytes), leaderEpoch = 0) assertEquals("Reading at the log end offset should produce 0 byte read.", 0, log.read(1025, 1000).records.sizeInBytes) @@ -850,7 +866,8 @@ class LogTest { /* create a multipart log with 100 messages */ val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) val numMessages = 100 val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = time.milliseconds)) @@ -888,7 +905,8 @@ class LogTest { /* this log should roll after every messageset */ val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 110: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */ log.appendAsLeader(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0) @@ -914,7 +932,8 @@ class LogTest { val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer) logProps.put(LogConfig.RetentionMsProp, 0: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) for(i <- 0 until messagesToAppend) log.appendAsLeader(TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = time.milliseconds - 10), leaderEpoch = 0) @@ -950,7 +969,8 @@ class LogTest { logProps.put(LogConfig.SegmentBytesProp, configSegmentSize: java.lang.Integer) // We use need to use magic value 1 here because the test is message size sensitive. logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) try { log.appendAsLeader(messageSet, leaderEpoch = 0) @@ -977,7 +997,8 @@ class LogTest { val logProps = new Properties() logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) try { log.appendAsLeader(messageSetWithUnkeyedMessage, leaderEpoch = 0) @@ -1019,7 +1040,8 @@ class LogTest { val maxMessageSize = second.sizeInBytes - 1 val logProps = new Properties() logProps.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer) - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) // should be able to append the small message log.appendAsLeader(first, leaderEpoch = 0) @@ -1045,7 +1067,8 @@ class LogTest { logProps.put(LogConfig.IndexIntervalBytesProp, indexInterval: java.lang.Integer) logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer) val config = LogConfig(logProps) - var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) for(i <- 0 until numMessages) log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(messageSize), timestamp = time.milliseconds + i * 10), leaderEpoch = 0) @@ -1071,12 +1094,14 @@ class LogTest { assertEquals("Should have same number of time index entries as before.", numTimeIndexEntries, log.activeSegment.timeIndex.entries) } - log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = lastOffset, scheduler = time.scheduler, time = time) + log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = lastOffset, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) verifyRecoveredLog(log) log.close() // test recovery case - log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) verifyRecoveredLog(log) log.close() } @@ -1092,7 +1117,8 @@ class LogTest { logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) val config = LogConfig(logProps) - val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) val messages = (0 until numMessages).map { i => MemoryRecords.withRecords(100 + i, CompressionType.NONE, 0, new SimpleRecord(time.milliseconds + i, i.toString.getBytes())) @@ -1116,7 +1142,8 @@ class LogTest { logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) val config = LogConfig(logProps) - var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) for(i <- 0 until numMessages) log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10), leaderEpoch = 0) val indexFiles = log.logSegments.map(_.index.file) @@ -1128,7 +1155,8 @@ class LogTest { timeIndexFiles.foreach(_.delete()) // reopen the log - log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0) assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0) @@ -1155,7 +1183,8 @@ class LogTest { logProps.put(LogConfig.MessageFormatVersionProp, "0.9.0") val config = LogConfig(logProps) - var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) for(i <- 0 until numMessages) log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10, magicValue = RecordBatch.MAGIC_VALUE_V1), leaderEpoch = 0) val timeIndexFiles = log.logSegments.map(_.timeIndex.file) @@ -1165,7 +1194,8 @@ class LogTest { timeIndexFiles.foreach(_.delete()) // The rebuilt time index should be empty - log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = numMessages + 1, scheduler = time.scheduler, time = time) + log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = numMessages + 1, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) val segArray = log.logSegments.toArray for (i <- 0 until segArray.size - 1) { assertEquals("The time index should be empty", 0, segArray(i).timeIndex.entries) @@ -1186,7 +1216,8 @@ class LogTest { logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) val config = LogConfig(logProps) - var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) for(i <- 0 until numMessages) log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10), leaderEpoch = 0) val indexFiles = log.logSegments.map(_.index.file) @@ -1208,7 +1239,8 @@ class LogTest { } // reopen the log - log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 200L, scheduler = time.scheduler, time = time) + log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 200L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) for(i <- 0 until numMessages) { assertEquals(i, log.read(i, 100, None).records.batches.iterator.next().lastOffset) @@ -1234,7 +1266,8 @@ class LogTest { logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer) // create a log - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) for (_ <- 1 to msgPerSeg) @@ -1289,7 +1322,8 @@ class LogTest { logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer) logProps.put(LogConfig.IndexIntervalBytesProp, setSize - 1: java.lang.Integer) val config = LogConfig(logProps) - val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) for (i<- 1 to msgPerSeg) @@ -1336,6 +1370,7 @@ class LogTest { logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0) @@ -1368,6 +1403,7 @@ class LogTest { logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) // add enough messages to roll over several segments then close and re-open and attempt to truncate @@ -1379,6 +1415,7 @@ class LogTest { logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) log.truncateTo(3) assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments) @@ -1405,6 +1442,7 @@ class LogTest { logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) // append some messages to create some segments @@ -1446,6 +1484,7 @@ class LogTest { logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) // append some messages to create some segments @@ -1461,6 +1500,7 @@ class LogTest { logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments) } @@ -1472,6 +1512,7 @@ class LogTest { logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) log.appendAsLeader(TestUtils.singletonRecords(value = null), leaderEpoch = 0) val head = log.read(0, 4096, None).records.records.iterator.next() @@ -1486,6 +1527,7 @@ class LogTest { logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) val records = (0 until 2).map(id => new SimpleRecord(id.toString.getBytes)).toArray records.foreach(record => log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, record), leaderEpoch = 0)) @@ -1500,6 +1542,7 @@ class LogTest { logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes, "value".getBytes)), leaderEpoch = 0) @@ -1523,6 +1566,7 @@ class LogTest { logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) val numMessages = 50 + TestUtils.random.nextInt(50) for (_ <- 0 until numMessages) @@ -1535,7 +1579,7 @@ class LogTest { TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1) // attempt recovery - log = new Log(logDir, config, 0L, recoveryPoint, time.scheduler, time) + log = new Log(logDir, config, 0L, recoveryPoint, time.scheduler, brokerTopicStats, time) assertEquals(numMessages, log.logEndOffset) val recovered = log.logSegments.flatMap(_.log.records.asScala.toList).toList @@ -1566,6 +1610,7 @@ class LogTest { logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) val set1 = MemoryRecords.withRecords(0, CompressionType.NONE, 0, new SimpleRecord("v1".getBytes(), "k1".getBytes())) val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, CompressionType.NONE, 0, new SimpleRecord("v3".getBytes(), "k3".getBytes())) @@ -1617,6 +1662,7 @@ class LogTest { logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) for (_ <- 0 until 100) log.appendAsLeader(createRecords, leaderEpoch = 0) @@ -1625,7 +1671,7 @@ class LogTest { // check if recovery was attempted. Even if the recovery point is 0L, recovery should not be attempted as the // clean shutdown file exists. recoveryPoint = log.logEndOffset - log = new Log(logDir, config, 0L, 0L, time.scheduler, time) + log = new Log(logDir, config, 0L, 0L, time.scheduler, brokerTopicStats, time) assertEquals(recoveryPoint, log.logEndOffset) cleanShutdownFile.delete() } @@ -1795,6 +1841,7 @@ class LogTest { logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) // append some messages to create some segments @@ -1924,7 +1971,7 @@ class LogTest { @Test def shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete() { - def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes,timestamp = 10L) + def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes, timestamp = 10L) val log = createLog(createRecords.sizeInBytes, retentionMs = 10000, cleanupPolicy = "compact,delete") @@ -1943,7 +1990,8 @@ class LogTest { //Given this partition is on leader epoch 72 val epoch = 72 - val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) log.leaderEpochCache.assign(epoch, records.size) //When appending messages as a leader (i.e. assignOffsets = true) @@ -1975,7 +2023,8 @@ class LogTest { recs } - val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) //When appending as follower (assignOffsets = false) for (i <- records.indices) @@ -2034,7 +2083,8 @@ class LogTest { def shouldTruncateLeaderEpochFileWhenTruncatingLog() { def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds) val logProps = CoreUtils.propsWith(LogConfig.SegmentBytesProp, (10 * createRecords.sizeInBytes).toString) - val log = new Log(logDir, LogConfig( logProps), recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, LogConfig( logProps), recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) val cache = epochCache(log) //Given 2 segments, 10 messages per segment @@ -2079,7 +2129,8 @@ class LogTest { */ @Test def testLogRecoversForLeaderEpoch() { - val log = new Log(logDir, LogConfig(new Properties()), recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, LogConfig(new Properties()), recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) val leaderEpochCache = epochCache(log) val firstBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 1, offset = 0) log.appendAsFollower(records = firstBatch) @@ -2101,7 +2152,8 @@ class LogTest { log.close() // reopen the log and recover from the beginning - val recoveredLog = new Log(logDir, LogConfig(new Properties()), recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val recoveredLog = new Log(logDir, LogConfig(new Properties()), recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time) val recoveredLeaderEpochCache = epochCache(recoveredLog) // epoch entries should be recovered @@ -2430,15 +2482,15 @@ class LogTest { logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy) logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString) val config = LogConfig(logProps) - val log = new Log(logDir, + new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + brokerTopicStats = brokerTopicStats, time = time, maxProducerIdExpirationMs = maxPidExpirationMs, producerIdExpirationCheckIntervalMs = pidExpirationCheckIntervalMs) - log } private def allAbortedTransactions(log: Log) = log.logSegments.flatMap(_.txnIndex.allAbortedTxns) http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala index 745fea6..37c2619 100644 --- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala @@ -86,7 +86,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging { // Don't consume messages as it may cause metrics to be re-created causing the test to fail, see KAFKA-5238 TestUtils.produceMessages(servers, topic, nMessages) assertTrue("Topic metrics don't exist", topicMetricGroups(topic).nonEmpty) - assertNotNull(BrokerTopicStats.getBrokerTopicStats(topic)) + servers.foreach(s => assertNotNull(s.brokerTopicStats.topicStats(topic))) AdminUtils.deleteTopic(zkUtils, topic) TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) assertEquals("Topic metrics exists after deleteTopic", Set.empty, topicMetricGroups(topic)) http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 55cfa27..b6b40c2 100755 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -61,7 +61,7 @@ class HighwatermarkPersistenceTest { // create replica manager val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower, - new MetadataCache(configs.head.brokerId)) + new BrokerTopicStats, new MetadataCache(configs.head.brokerId)) replicaManager.startup() try { replicaManager.checkpointHighWatermarks() @@ -106,7 +106,7 @@ class HighwatermarkPersistenceTest { // create replica manager val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower, - new MetadataCache(configs.head.brokerId)) + new BrokerTopicStats, new MetadataCache(configs.head.brokerId)) replicaManager.startup() try { replicaManager.checkpointHighWatermarks() http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index 0c62a50..da94569 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -55,7 +55,8 @@ class IsrExpirationTest { @Before def setUp() { replicaManager = new ReplicaManager(configs.head, metrics, time, null, null, null, new AtomicBoolean(false), - QuotaFactory.instantiate(configs.head, metrics, time).follower, new MetadataCache(configs.head.brokerId)) + QuotaFactory.instantiate(configs.head, metrics, time).follower, new BrokerTopicStats, + new MetadataCache(configs.head.brokerId)) } @After http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index 9f7a47a..e770106 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -175,7 +175,8 @@ class ReplicaManagerQuotasTest { replay(logManager) replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager, - new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower, new MetadataCache(configs.head.brokerId)) + new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower, + new BrokerTopicStats, new MetadataCache(configs.head.brokerId)) //create the two replicas for ((p, _) <- fetchInfo) { http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 4886b94..6efd0a3 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -64,7 +64,8 @@ class ReplicaManagerTest { val config = KafkaConfig.fromProps(props) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr, - new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new MetadataCache(config.brokerId)) + new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats, + new MetadataCache(config.brokerId)) try { val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1)) partition.getOrCreateReplica(1) @@ -82,7 +83,8 @@ class ReplicaManagerTest { val config = KafkaConfig.fromProps(props) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr, - new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new MetadataCache(config.brokerId)) + new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats, + new MetadataCache(config.brokerId)) try { val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1)) partition.getOrCreateReplica(1) @@ -99,7 +101,8 @@ class ReplicaManagerTest { val config = KafkaConfig.fromProps(props) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr, - new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new MetadataCache(config.brokerId), Option(this.getClass.getName)) + new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats, + new MetadataCache(config.brokerId), Option(this.getClass.getName)) try { def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = { assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS) @@ -132,7 +135,8 @@ class ReplicaManagerTest { EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes() EasyMock.replay(metadataCache) val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr, - new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, metadataCache) + new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats, + metadataCache) try { var produceCallbackFired = false @@ -211,7 +215,8 @@ class ReplicaManagerTest { EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(1))).andReturn(true).anyTimes() EasyMock.replay(metadataCache) val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr, - new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, metadataCache, Option(this.getClass.getName)) + new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats, + metadataCache, Option(this.getClass.getName)) try { val brokerList: java.util.List[Integer] = Seq[Integer](0, 1).asJava @@ -338,7 +343,8 @@ class ReplicaManagerTest { EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(2))).andReturn(true).anyTimes() EasyMock.replay(metadataCache) val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr, - new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, metadataCache, Option(this.getClass.getName)) + new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats, + metadataCache, Option(this.getClass.getName)) try { val brokerList: java.util.List[Integer] = Seq[Integer](0, 1, 2).asJava http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index d7822c1..ac851d8 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -99,7 +99,8 @@ class SimpleFetchTest { // create the replica manager replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager, - new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower, new MetadataCache(configs.head.brokerId)) + new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower, new BrokerTopicStats, + new MetadataCache(configs.head.brokerId)) // add the partition with two replicas, both in ISR val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, partitionId)) @@ -150,8 +151,9 @@ class SimpleFetchTest { */ @Test def testReadFromLog() { - val initialTopicCount = BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count() - val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count() + val brokerTopicStats = new BrokerTopicStats + val initialTopicCount = brokerTopicStats.topicStats(topic).totalFetchRequestRate.count() + val initialAllTopicsCount = brokerTopicStats.allTopicsStats.totalFetchRequestRate.count() val readCommittedRecords = replicaManager.readFromLocalLog( replicaId = Request.OrdinaryConsumerId, @@ -178,7 +180,7 @@ class SimpleFetchTest { assertEquals("Reading any data can return messages up to the end of the log", recordToLEO, new SimpleRecord(firstRecord)) - assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count()) - assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count()) + assertEquals("Counts should increment after fetch", initialTopicCount+2, brokerTopicStats.topicStats(topic).totalFetchRequestRate.count()) + assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, brokerTopicStats.allTopicsStats.totalFetchRequestRate.count()) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index e2511d8..012fdfd 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -975,7 +975,8 @@ object TestUtils extends Logging { maxPidExpirationMs = 60 * 60 * 1000, scheduler = time.scheduler, time = time, - brokerState = BrokerState()) + brokerState = BrokerState(), + brokerTopicStats = new BrokerTopicStats) } @deprecated("This method has been deprecated and it will be removed in a future release.", "0.10.0.0")
