http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/utils/DelayedItem.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/DelayedItem.scala b/core/src/main/scala/kafka/utils/DelayedItem.scala index cbab2a0..9d92b97 100644 --- a/core/src/main/scala/kafka/utils/DelayedItem.scala +++ b/core/src/main/scala/kafka/utils/DelayedItem.scala @@ -18,11 +18,14 @@ package kafka.utils import java.util.concurrent._ + +import org.apache.kafka.common.utils.Time + import scala.math._ class DelayedItem(delayMs: Long) extends Delayed with Logging { - private val dueMs = SystemTime.milliseconds + delayMs + private val dueMs = Time.SYSTEM.milliseconds + delayMs def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay)) @@ -30,15 +33,12 @@ class DelayedItem(delayMs: Long) extends Delayed with Logging { * The remaining delay time */ def getDelay(unit: TimeUnit): Long = { - unit.convert(max(dueMs - SystemTime.milliseconds, 0), TimeUnit.MILLISECONDS) + unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), TimeUnit.MILLISECONDS) } def compareTo(d: Delayed): Int = { val other = d.asInstanceOf[DelayedItem] - - if(dueMs < other.dueMs) -1 - else if(dueMs > other.dueMs) 1 - else 0 + java.lang.Long.compare(dueMs, other.dueMs) } }
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala index 220b6e1..e3d389b 100644 --- a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala +++ b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala @@ -22,10 +22,10 @@ import java.io.IOException import org.apache.kafka.clients.{ClientRequest, ClientResponse, NetworkClient} import org.apache.kafka.common.Node import org.apache.kafka.common.requests.AbstractRequest +import org.apache.kafka.common.utils.Time import scala.annotation.tailrec import scala.collection.JavaConverters._ -import org.apache.kafka.common.utils.{Time => JTime} object NetworkClientBlockingOps { implicit def networkClientBlockingOps(client: NetworkClient): NetworkClientBlockingOps = @@ -52,7 +52,7 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal { * This method can be used to check the status of a connection prior to calling `blockingReady` to be able * to tell whether the latter completed a new connection. */ - def isReady(node: Node)(implicit time: JTime): Boolean = { + def isReady(node: Node)(implicit time: Time): Boolean = { val currentTime = time.milliseconds() client.poll(0, currentTime) client.isReady(node, currentTime) @@ -70,7 +70,7 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal { * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with * care. */ - def blockingReady(node: Node, timeout: Long)(implicit time: JTime): Boolean = { + def blockingReady(node: Node, timeout: Long)(implicit time: Time): Boolean = { require(timeout >=0, "timeout should be >= 0") val startTime = time.milliseconds() @@ -103,7 +103,7 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal { * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with * care. */ - def blockingSendAndReceive(request: ClientRequest, body: AbstractRequest)(implicit time: JTime): ClientResponse = { + def blockingSendAndReceive(request: ClientRequest, body: AbstractRequest)(implicit time: Time): ClientResponse = { client.send(request, time.milliseconds()) pollContinuously { responses => @@ -126,7 +126,7 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal { * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with * care. */ - private def pollContinuously[T](collect: Seq[ClientResponse] => Option[T])(implicit time: JTime): T = { + private def pollContinuously[T](collect: Seq[ClientResponse] => Option[T])(implicit time: Time): T = { @tailrec def recursivePoll: T = { http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/utils/Throttler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/Throttler.scala b/core/src/main/scala/kafka/utils/Throttler.scala index 998ade1..e781cd6 100644 --- a/core/src/main/scala/kafka/utils/Throttler.scala +++ b/core/src/main/scala/kafka/utils/Throttler.scala @@ -18,8 +18,11 @@ package kafka.utils import kafka.metrics.KafkaMetricsGroup +import org.apache.kafka.common.utils.Time + import java.util.concurrent.TimeUnit import java.util.Random + import scala.math._ /** @@ -33,19 +36,23 @@ import scala.math._ * @param time: The time implementation to use */ @threadsafe -class Throttler(val desiredRatePerSec: Double, - val checkIntervalMs: Long = 100L, - val throttleDown: Boolean = true, +class Throttler(desiredRatePerSec: Double, + checkIntervalMs: Long = 100L, + throttleDown: Boolean = true, metricName: String = "throttler", units: String = "entries", - val time: Time = SystemTime) extends Logging with KafkaMetricsGroup { + time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup { private val lock = new Object private val meter = newMeter(metricName, units, TimeUnit.SECONDS) + private val checkIntervalNs = TimeUnit.MILLISECONDS.toNanos(checkIntervalMs) private var periodStartNs: Long = time.nanoseconds private var observedSoFar: Double = 0.0 def maybeThrottle(observed: Double) { + val msPerSec = TimeUnit.SECONDS.toMillis(1) + val nsPerSec = TimeUnit.SECONDS.toNanos(1) + meter.mark(observed.toLong) lock synchronized { observedSoFar += observed @@ -53,15 +60,15 @@ class Throttler(val desiredRatePerSec: Double, val elapsedNs = now - periodStartNs // if we have completed an interval AND we have observed something, maybe // we should take a little nap - if(elapsedNs > checkIntervalMs * Time.NsPerMs && observedSoFar > 0) { - val rateInSecs = (observedSoFar * Time.NsPerSec) / elapsedNs + if (elapsedNs > checkIntervalNs && observedSoFar > 0) { + val rateInSecs = (observedSoFar * nsPerSec) / elapsedNs val needAdjustment = !(throttleDown ^ (rateInSecs > desiredRatePerSec)) - if(needAdjustment) { + if (needAdjustment) { // solve for the amount of time to sleep to make us hit the desired rate - val desiredRateMs = desiredRatePerSec / Time.MsPerSec.toDouble - val elapsedMs = elapsedNs / Time.NsPerMs + val desiredRateMs = desiredRatePerSec / msPerSec.toDouble + val elapsedMs = TimeUnit.NANOSECONDS.toMillis(elapsedNs) val sleepTime = round(observedSoFar / desiredRateMs - elapsedMs) - if(sleepTime > 0) { + if (sleepTime > 0) { trace("Natural rate is %f per second but desired rate is %f, sleeping for %d ms to compensate.".format(rateInSecs, desiredRatePerSec, sleepTime)) time.sleep(sleepTime) } @@ -71,14 +78,14 @@ class Throttler(val desiredRatePerSec: Double, } } } - + } object Throttler { def main(args: Array[String]) { val rand = new Random() - val throttler = new Throttler(100000, 100, true, time = SystemTime) + val throttler = new Throttler(100000, 100, true, time = Time.SYSTEM) val interval = 30000 var start = System.currentTimeMillis var total = 0 http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/utils/Time.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/Time.scala b/core/src/main/scala/kafka/utils/Time.scala deleted file mode 100644 index d578a6a..0000000 --- a/core/src/main/scala/kafka/utils/Time.scala +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -import java.util.concurrent.TimeUnit - -/** - * Some common constants - */ -object Time { - val NsPerUs = 1000 - val UsPerMs = 1000 - val MsPerSec = 1000 - val NsPerMs = NsPerUs * UsPerMs - val NsPerSec = NsPerMs * MsPerSec - val UsPerSec = UsPerMs * MsPerSec - val SecsPerMin = 60 - val MinsPerHour = 60 - val HoursPerDay = 24 - val SecsPerHour = SecsPerMin * MinsPerHour - val SecsPerDay = SecsPerHour * HoursPerDay - val MinsPerDay = MinsPerHour * HoursPerDay -} - -/** - * A mockable interface for time functions - */ -trait Time extends org.apache.kafka.common.utils.Time { - - def milliseconds: Long - - def nanoseconds: Long - - def hiResClockMs: Long = TimeUnit.NANOSECONDS.toMillis(nanoseconds) - - def sleep(ms: Long) -} - -/** - * The normal system implementation of time functions - */ -object SystemTime extends Time { - - def milliseconds: Long = System.currentTimeMillis - - def nanoseconds: Long = System.nanoTime - - def sleep(ms: Long): Unit = Thread.sleep(ms) - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index de56fe2..fcb5648 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -32,6 +32,7 @@ import org.I0Itec.zkclient.serialize.ZkSerializer import org.I0Itec.zkclient.{ZkClient, ZkConnection} import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.utils.Time import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback} import org.apache.zookeeper.KeeperException.Code import org.apache.zookeeper.data.{ACL, Stat} @@ -364,7 +365,7 @@ class ZkUtils(val zkClient: ZkClient, rack: Option[String], apiVersion: ApiVersion) { val brokerIdPath = BrokerIdsPath + "/" + id - val timestamp = SystemTime.milliseconds.toString + val timestamp = Time.SYSTEM.milliseconds.toString val version = if (apiVersion >= KAFKA_0_10_0_IV1) 3 else 2 var jsonMap = Map("version" -> version, http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/utils/timer/Timer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala index 67de276..0538271 100644 --- a/core/src/main/scala/kafka/utils/timer/Timer.scala +++ b/core/src/main/scala/kafka/utils/timer/Timer.scala @@ -21,8 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.locks.ReentrantReadWriteLock import kafka.utils.threadsafe -import org.apache.kafka.common.utils.Utils -import kafka.utils.SystemTime +import org.apache.kafka.common.utils.{Time, Utils} trait Timer { /** @@ -56,7 +55,7 @@ trait Timer { class SystemTimer(executorName: String, tickMs: Long = 1, wheelSize: Int = 20, - startMs: Long = SystemTime.hiResClockMs) extends Timer { + startMs: Long = Time.SYSTEM.hiResClockMs) extends Timer { // timeout timer private[this] val taskExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() { @@ -82,7 +81,7 @@ class SystemTimer(executorName: String, def add(timerTask: TimerTask): Unit = { readLock.lock() try { - addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + SystemTime.hiResClockMs)) + addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs)) } finally { readLock.unlock() } http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala index 7a77b27..3dbfa8f 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala @@ -16,10 +16,11 @@ */ package kafka.utils.timer -import java.util.concurrent.{TimeUnit, Delayed} -import java.util.concurrent.atomic.{AtomicLong, AtomicInteger} +import java.util.concurrent.{Delayed, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} -import kafka.utils.{SystemTime, threadsafe} +import kafka.utils.threadsafe +import org.apache.kafka.common.utils.Time import scala.math._ @@ -117,7 +118,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { } def getDelay(unit: TimeUnit): Long = { - unit.convert(max(getExpiration - SystemTime.hiResClockMs, 0), TimeUnit.MILLISECONDS) + unit.convert(max(getExpiration - Time.SYSTEM.hiResClockMs, 0), TimeUnit.MILLISECONDS) } def compareTo(d: Delayed): Int = { http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala index 6bd8e4f..6fef2b3 100755 --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -28,7 +28,7 @@ import kafka.message._ import scala.math._ import joptsimple._ -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.utils.{Time, Utils} /** * This test does linear writes using either a kafka log or a file and measures throughput and latency. @@ -201,7 +201,7 @@ object TestLinearWriteSpeed { class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: ByteBufferMessageSet) extends Writable { Utils.delete(dir) - val log = new Log(dir, config, 0L, scheduler, SystemTime) + val log = new Log(dir, config, 0L, scheduler, Time.SYSTEM) def write(): Int = { log.append(messages, true) messages.sizeInBytes http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala index 7cb5b6e..7636d96 100644 --- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala +++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala @@ -23,8 +23,9 @@ import java.util.Random import java.util.concurrent._ import joptsimple._ -import kafka.server.{DelayedOperationPurgatory, DelayedOperation} +import kafka.server.{DelayedOperation, DelayedOperationPurgatory} import kafka.utils._ +import org.apache.kafka.common.utils.Time import scala.math._ import scala.collection.JavaConverters._ @@ -276,7 +277,7 @@ object TestPurgatoryPerformance { private class Scheduled(val operation: FakeOperation) extends Delayed { def getDelay(unit: TimeUnit): Long = { - unit.convert(max(operation.completesAt - SystemTime.milliseconds, 0), TimeUnit.MILLISECONDS) + unit.convert(max(operation.completesAt - Time.SYSTEM.milliseconds, 0), TimeUnit.MILLISECONDS) } def compareTo(d: Delayed): Int = { http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index 16fe788..e93cae3 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -17,17 +17,15 @@ package kafka.api -import kafka.cluster.{EndPoint, Broker} +import kafka.cluster.{Broker, EndPoint} import kafka.common.{OffsetAndMetadata, OffsetMetadataAndError} import kafka.common._ -import kafka.message.{Message, ByteBufferMessageSet} -import kafka.utils.SystemTime - +import kafka.message.{ByteBufferMessageSet, Message} import kafka.common.TopicAndPartition - import java.nio.ByteBuffer import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} +import org.apache.kafka.common.utils.Time import org.junit._ import org.scalatest.junit.JUnitSuite import org.junit.Assert._ @@ -112,7 +110,7 @@ object SerializationTestUtils { def createTestOffsetCommitRequestV2: OffsetCommitRequest = { new OffsetCommitRequest( groupId = "group 1", - retentionMs = SystemTime.milliseconds, + retentionMs = Time.SYSTEM.milliseconds, requestInfo=collection.immutable.Map( TopicAndPartition(topic1, 0) -> OffsetAndMetadata(42L, "some metadata"), TopicAndPartition(topic1, 1) -> OffsetAndMetadata(100L, OffsetMetadata.NoMetadata) @@ -124,8 +122,8 @@ object SerializationTestUtils { versionId = 1, groupId = "group 1", requestInfo = collection.immutable.Map( - TopicAndPartition(topic1, 0) -> OffsetAndMetadata(42L, "some metadata", SystemTime.milliseconds), - TopicAndPartition(topic1, 1) -> OffsetAndMetadata(100L, OffsetMetadata.NoMetadata, SystemTime.milliseconds) + TopicAndPartition(topic1, 0) -> OffsetAndMetadata(42L, "some metadata", Time.SYSTEM.milliseconds), + TopicAndPartition(topic1, 1) -> OffsetAndMetadata(100L, OffsetMetadata.NoMetadata, Time.SYSTEM.milliseconds) )) } http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala index 8d48609..f7dd40f 100644 --- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala +++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala @@ -51,7 +51,7 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness { TestUtils.waitUntilTrue(() => invocationCount == 1 && notification == notificationMessage1, "failed to send/process notification message in the timeout period.") /*There is no easy way to test that purging. Even if we mock kafka time with MockTime, the purging compares kafka time with the time stored in zookeeper stat and the - embeded zookeeper server does not provide a way to mock time. so to test purging we will have to use SystemTime.sleep(changeExpirationMs + 1) issue a write and check + embeded zookeeper server does not provide a way to mock time. so to test purging we will have to use Time.SYSTEM.sleep(changeExpirationMs + 1) issue a write and check Assert.assertEquals(1, ZkUtils.getChildren(zkClient, seqNodeRoot).size) however even after that the assertion can fail as the second node it self can be deleted depending on how threads get scheduled.*/ http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala index 6430b33..63afd4e 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala @@ -20,14 +20,12 @@ package kafka.controller import java.util.Properties import java.util.concurrent.LinkedBlockingQueue -import kafka.api.RequestOrResponse import kafka.common.TopicAndPartition import kafka.integration.KafkaServerTestHarness import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils._ import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.requests.{AbstractRequestResponse, AbstractRequest} -import org.apache.kafka.common.utils.SystemTime +import org.apache.kafka.common.utils.Time import org.apache.log4j.{Level, Logger} import org.junit.{After, Before, Test} @@ -150,7 +148,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging { } class MockChannelManager(private val controllerContext: ControllerContext, config: KafkaConfig, metrics: Metrics) - extends ControllerChannelManager(controllerContext, config, new SystemTime, metrics) { + extends ControllerChannelManager(controllerContext, config, Time.SYSTEM, metrics) { def stopSendThread(brokerId: Int) { val requestThread = brokerStateInfo(brokerId).requestSendThread http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala index a050bb3..791bdb0 100755 --- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala @@ -35,7 +35,7 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin val tmpDir = TestUtils.tempDir() val logDir = TestUtils.randomPartitionLogDir(tmpDir) - val time = new MockTime(0) + val time = new MockTime(0, 0) val logConfig = LogConfig() @After http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala index 36c61d6..5e029fc 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala @@ -44,7 +44,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging val compactionLag = 1 * msPerHour assertTrue("compactionLag must be divisible by 2 for this test", compactionLag % 2 == 0) - val time = new MockTime(1400000000000L) // Tue May 13 16:53:20 UTC 2014 + val time = new MockTime(1400000000000L, 1000L) // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs` val cleanerBackOffMs = 200L val segmentSize = 100 val deleteDelay = 1000 http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 6e5806f..0cd52d6 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -40,7 +40,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { logProps.put(LogConfig.SegmentIndexBytesProp, 1024: java.lang.Integer) logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) val logConfig = LogConfig(logProps) - val time = new MockTime(1400000000000L) // Tue May 13 16:53:20 UTC 2014 + val time = new MockTime(1400000000000L, 1000L) // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs` @After def tearDown(): Unit = { http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 7f78148..f02c5cb 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -23,10 +23,9 @@ import kafka.common.LongRef import org.junit.{After, Test} import kafka.utils.TestUtils import kafka.message._ -import kafka.utils.SystemTime +import org.apache.kafka.common.utils.Time import scala.collection._ - import scala.collection.mutable.ListBuffer class LogSegmentTest { @@ -42,7 +41,7 @@ class LogSegmentTest { timeIdxFile.delete() val idx = new OffsetIndex(idxFile, offset, 1000) val timeIdx = new TimeIndex(timeIdxFile, offset, 1500) - val seg = new LogSegment(ms, idx, timeIdx, offset, indexIntervalBytes, 0, SystemTime) + val seg = new LogSegment(ms, idx, timeIdx, offset, indexIntervalBytes, 0, Time.SYSTEM) segments += seg seg } @@ -296,7 +295,7 @@ class LogSegmentTest { /* create a segment with pre allocate */ def createSegment(offset: Long, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean): LogSegment = { val tempDir = TestUtils.tempDir() - val seg = new LogSegment(tempDir, offset, 10, 1000, 0, SystemTime, fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate) + val seg = new LogSegment(tempDir, offset, 10, 1000, 0, Time.SYSTEM, fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate) segments += seg seg } @@ -317,7 +316,7 @@ class LogSegmentTest { @Test def testCreateWithInitFileSizeClearShutdown() { val tempDir = TestUtils.tempDir() - val seg = new LogSegment(tempDir, 40, 10, 1000, 0, SystemTime, false, 512*1024*1024, true) + val seg = new LogSegment(tempDir, 40, 10, 1000, 0, Time.SYSTEM, false, 512*1024*1024, true) val ms = messages(50, "hello", "there") seg.append(50, Message.NoTimestamp, -1L, ms) @@ -333,7 +332,7 @@ class LogSegmentTest { //After close, file should be trimmed assertEquals(oldSize, seg.log.file.length) - val segReopen = new LogSegment(tempDir, 40, 10, 1000, 0, SystemTime, true, 512*1024*1024, true) + val segReopen = new LogSegment(tempDir, 40, 10, 1000, 0, Time.SYSTEM, true, 512*1024*1024, true) segments += segReopen val readAgain = segReopen.read(startOffset = 55, maxSize = 200, maxOffset = None) http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 317f3d6..7d0764b 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -30,7 +30,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.requests.{ProduceRequest, RequestHeader} -import org.apache.kafka.common.utils.SystemTime +import org.apache.kafka.common.utils.Time import kafka.server.KafkaConfig import kafka.utils.TestUtils import org.apache.kafka.common.record.MemoryRecords @@ -52,7 +52,7 @@ class SocketServerTest extends JUnitSuite { props.put("connections.max.idle.ms", "60000") val config = KafkaConfig.fromProps(props) val metrics = new Metrics - val server = new SocketServer(config, metrics, new SystemTime) + val server = new SocketServer(config, metrics, Time.SYSTEM) server.startup() val sockets = new ArrayBuffer[Socket] @@ -239,7 +239,7 @@ class SocketServerTest extends JUnitSuite { val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0) overrideProps.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$overrideNum") val serverMetrics = new Metrics() - val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, new SystemTime()) + val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, Time.SYSTEM) try { overrideServer.startup() // make the maximum allowable number of connections @@ -269,7 +269,7 @@ class SocketServerTest extends JUnitSuite { overrideProps.put(KafkaConfig.ListenersProp, "SSL://localhost:0") val serverMetrics = new Metrics - val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, new SystemTime) + val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, Time.SYSTEM) try { overrideServer.startup() val sslContext = SSLContext.getInstance("TLSv1.2") @@ -317,7 +317,7 @@ class SocketServerTest extends JUnitSuite { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0) val serverMetrics = new Metrics var conn: Socket = null - val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, new SystemTime) { + val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM) { override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, protocol: SecurityProtocol): Processor = { new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, config.connectionsMaxIdleMs, protocol, config.values, metrics) { @@ -367,7 +367,7 @@ class SocketServerTest extends JUnitSuite { props.setProperty(KafkaConfig.ConnectionsMaxIdleMsProp, "100") val serverMetrics = new Metrics var conn: Socket = null - val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, new SystemTime) + val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM) try { overrideServer.startup() conn = connect(overrideServer) http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index e84c498..f5943d6 100755 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -36,6 +36,7 @@ import kafka.utils.TestUtils._ import scala.collection.Map import scala.collection.mutable.ArrayBuffer import kafka.utils._ +import org.apache.kafka.common.utils.Time @deprecated("This test has been deprecated and it will be removed in a future release.", "0.10.0.0") class AsyncProducerTest { @@ -416,6 +417,7 @@ class AsyncProducerTest { override def nanoseconds: Long = 0L override def milliseconds: Long = 0L override def sleep(ms: Long): Unit = {} + override def hiResClockMs: Long = 0L } val handler = new DefaultEventHandler[Int,String](config, partitioner = new FixedValuePartitioner(), http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/producer/ProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index f4a339e..ec51e20 100755 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -32,6 +32,7 @@ import kafka.utils._ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.TimestampType +import org.apache.kafka.common.utils.Time import org.apache.log4j.{Level, Logger} import org.junit.Assert._ import org.junit.{After, Before, Test} @@ -316,7 +317,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{ // any requests should be accepted and queue up, but not handled server1.requestHandlerPool.shutdown() - val t1 = SystemTime.milliseconds + val t1 = Time.SYSTEM.milliseconds try { // this message should be assigned to partition 0 whose leader is on broker 0, but // broker 0 will not response within timeoutMs millis. @@ -324,7 +325,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{ } catch { case _: FailedToSendMessageException => /* success */ } - val t2 = SystemTime.milliseconds + val t2 = Time.SYSTEM.milliseconds // make sure we don't wait fewer than timeoutMs assertTrue((t2-t1) >= timeoutMs) http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index 7e72eec..d63afe7 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -28,6 +28,7 @@ import kafka.message._ import kafka.server.KafkaConfig import kafka.utils._ import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} +import org.apache.kafka.common.utils.Time import org.junit.Test import org.junit.Assert._ @@ -55,7 +56,7 @@ class SyncProducerTest extends KafkaServerTestHarness { val producer = new SyncProducer(new SyncProducerConfig(props)) - val firstStart = SystemTime.milliseconds + val firstStart = Time.SYSTEM.milliseconds try { val response = producer.send(produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1)) @@ -63,9 +64,9 @@ class SyncProducerTest extends KafkaServerTestHarness { } catch { case e: Exception => fail("Unexpected failure sending message to broker. " + e.getMessage) } - val firstEnd = SystemTime.milliseconds + val firstEnd = Time.SYSTEM.milliseconds assertTrue((firstEnd-firstStart) < 2000) - val secondStart = SystemTime.milliseconds + val secondStart = Time.SYSTEM.milliseconds try { val response = producer.send(produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1)) @@ -73,7 +74,7 @@ class SyncProducerTest extends KafkaServerTestHarness { } catch { case e: Exception => fail("Unexpected failure sending message to broker. " + e.getMessage) } - val secondEnd = SystemTime.milliseconds + val secondEnd = Time.SYSTEM.milliseconds assertTrue((secondEnd-secondStart) < 2000) try { val response = producer.send(produceRequest("test", 0, @@ -216,14 +217,14 @@ class SyncProducerTest extends KafkaServerTestHarness { // any requests should be accepted and queue up, but not handled server.requestHandlerPool.shutdown() - val t1 = SystemTime.milliseconds + val t1 = Time.SYSTEM.milliseconds try { producer.send(request) fail("Should have received timeout exception since request handling is stopped.") } catch { case _: SocketTimeoutException => /* success */ } - val t2 = SystemTime.milliseconds + val t2 = Time.SYSTEM.milliseconds // make sure we don't wait fewer than timeoutMs for a response assertTrue((t2-t1) >= timeoutMs) } http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index ae0d12f..49ef9f6 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -17,7 +17,7 @@ package kafka.server -import kafka.utils.SystemTime +import org.apache.kafka.common.utils.Time import org.junit.{After, Before, Test} import org.junit.Assert._ @@ -55,13 +55,13 @@ class DelayedOperationTest { @Test def testRequestExpiry() { val expiration = 20L - val start = SystemTime.hiResClockMs + val start = Time.SYSTEM.hiResClockMs val r1 = new MockDelayedOperation(expiration) val r2 = new MockDelayedOperation(200000L) assertFalse("r1 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r1, Array("test1"))) assertFalse("r2 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r2, Array("test2"))) r1.awaitExpiration() - val elapsed = SystemTime.hiResClockMs - start + val elapsed = Time.SYSTEM.hiResClockMs - start assertTrue("r1 completed due to expiration", r1.isCompleted()) assertFalse("r2 hasn't completed", r2.isCompleted()) assertTrue(s"Time for expiration $elapsed should at least $expiration", elapsed >= expiration) http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index e7e1554..358b2a4 100755 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -18,9 +18,8 @@ package kafka.server import kafka.log._ import java.io.File -import kafka.utils.SystemTime import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.utils.{MockTime => JMockTime, Utils} +import org.apache.kafka.common.utils.Utils import org.easymock.EasyMock import org.junit._ import org.junit.Assert._ @@ -55,9 +54,10 @@ class HighwatermarkPersistenceTest { val scheduler = new KafkaScheduler(2) scheduler.startup val metrics = new Metrics + val time = new MockTime // create replica manager - val replicaManager = new ReplicaManager(configs.head, metrics, new MockTime, new JMockTime, zkUtils, scheduler, - logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, SystemTime).follower) + val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, + logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower) replicaManager.startup() try { replicaManager.checkpointHighWatermarks() @@ -66,9 +66,9 @@ class HighwatermarkPersistenceTest { val partition0 = replicaManager.getOrCreatePartition(topic, 0) // create leader and follower replicas val log0 = logManagers.head.createLog(TopicAndPartition(topic, 0), LogConfig()) - val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, SystemTime, 0, Some(log0)) + val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, time, 0, Some(log0)) partition0.addReplicaIfNotExists(leaderReplicaPartition0) - val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, SystemTime) + val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, time) partition0.addReplicaIfNotExists(followerReplicaPartition0) replicaManager.checkpointHighWatermarks() fooPartition0Hw = hwmFor(replicaManager, topic, 0) @@ -98,9 +98,10 @@ class HighwatermarkPersistenceTest { val scheduler = new KafkaScheduler(2) scheduler.startup val metrics = new Metrics + val time = new MockTime // create replica manager - val replicaManager = new ReplicaManager(configs.head, metrics, new MockTime(), new JMockTime, zkUtils, - scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, SystemTime).follower) + val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, + scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower) replicaManager.startup() try { replicaManager.checkpointHighWatermarks() @@ -110,7 +111,7 @@ class HighwatermarkPersistenceTest { // create leader log val topic1Log0 = logManagers.head.createLog(TopicAndPartition(topic1, 0), LogConfig()) // create a local replica for topic1 - val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0, SystemTime, 0, Some(topic1Log0)) + val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0, time, 0, Some(topic1Log0)) topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0) replicaManager.checkpointHighWatermarks() topic1Partition0Hw = hwmFor(replicaManager, topic1, 0) @@ -126,7 +127,7 @@ class HighwatermarkPersistenceTest { // create leader log val topic2Log0 = logManagers.head.createLog(TopicAndPartition(topic2, 0), LogConfig()) // create a local replica for topic2 - val leaderReplicaTopic2Partition0 = new Replica(configs.head.brokerId, topic2Partition0, SystemTime, 0, Some(topic2Log0)) + val leaderReplicaTopic2Partition0 = new Replica(configs.head.brokerId, topic2Partition0, time, 0, Some(topic2Log0)) topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0) replicaManager.checkpointHighWatermarks() var topic2Partition0Hw = hwmFor(replicaManager, topic2, 0) http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index 0051247..2d51be9 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -19,7 +19,8 @@ package kafka.server import java.util.Properties import org.apache.kafka.common.metrics.Metrics -import org.junit.{Test, Before, After} +import org.junit.{After, Before, Test} + import collection.mutable.HashMap import collection.mutable.Map import kafka.cluster.{Partition, Replica} @@ -28,8 +29,9 @@ import kafka.log.Log import org.junit.Assert._ import kafka.utils._ import java.util.concurrent.atomic.AtomicBoolean + import kafka.message.MessageSet -import org.apache.kafka.common.utils.{MockTime => JMockTime} +import org.apache.kafka.common.utils.Time class IsrExpirationTest { @@ -44,15 +46,14 @@ class IsrExpirationTest { val topic = "foo" val time = new MockTime - val jTime = new JMockTime val metrics = new Metrics var replicaManager: ReplicaManager = null @Before def setUp() { - replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, null, null, null, new AtomicBoolean(false), - QuotaFactory.instantiate(configs.head, metrics, SystemTime).follower) + replicaManager = new ReplicaManager(configs.head, metrics, time, null, null, null, new AtomicBoolean(false), + QuotaFactory.instantiate(configs.head, metrics, time).follower) } @After http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index e3f0ad2..9cf6318 100755 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -30,7 +30,7 @@ import kafka.utils.TestUtils._ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} -import org.apache.kafka.common.utils.SystemTime +import org.apache.kafka.common.utils.Time import org.junit.{After, Before, Test} class LeaderElectionTest extends ZooKeeperTestHarness { @@ -134,7 +134,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness { val controllerContext = new ControllerContext(zkUtils) controllerContext.liveBrokers = brokers.toSet val metrics = new Metrics - val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, new SystemTime, metrics) + val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, Time.SYSTEM, metrics) controllerChannelManager.startup() try { val staleControllerEpoch = 0 http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 0f3ee63..42e9be1 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -32,7 +32,7 @@ import kafka.utils._ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.utils.{Time, Utils} import org.easymock.{EasyMock, IAnswer} import org.junit.Assert._ import org.junit.{After, Before, Test} http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 64c67d6..e46c41f 100755 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -24,10 +24,9 @@ import kafka.utils._ import kafka.utils.TestUtils._ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.utils.{Time, Utils} import org.junit.{After, Before, Test} import org.junit.Assert._ - import java.util.Properties import java.io.File @@ -43,7 +42,6 @@ class OffsetCommitTest extends ZooKeeperTestHarness { var server: KafkaServer = null var logSize: Int = 100 var simpleConsumer: SimpleConsumer = null - var time: Time = new MockTime() @Before override def setUp() { @@ -53,8 +51,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness { config.setProperty(KafkaConfig.OffsetsRetentionCheckIntervalMsProp, retentionCheckInterval.toString) val logDirPath = config.getProperty("log.dir") logDir = new File(logDirPath) - time = new MockTime() - server = TestUtils.createServer(KafkaConfig.fromProps(config), time) + server = TestUtils.createServer(KafkaConfig.fromProps(config), Time.SYSTEM) simpleConsumer = new SimpleConsumer("localhost", server.boundPort(), 1000000, 64*1024, "test-client") val consumerMetadataRequest = GroupCoordinatorRequest(group) Stream.continually { @@ -255,7 +252,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness { // committed offset should expire val commitRequest2 = OffsetCommitRequest( groupId = group, - requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(3L, "metadata", SystemTime.milliseconds - 2*24*60*60*1000L)), + requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(3L, "metadata", Time.SYSTEM.milliseconds - 2*24*60*60*1000L)), versionId = 1 ) assertEquals(Errors.NONE.code, simpleConsumer.commitOffsets(commitRequest2).commitStatus.get(topicPartition).get) http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index e226833..378d382 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -27,17 +27,15 @@ import kafka.message.{ByteBufferMessageSet, Message} import kafka.utils._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.utils.{MockTime => JMockTime} import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.easymock.EasyMock -import org.easymock.EasyMock._ +import EasyMock._ import org.junit.Assert._ import org.junit.{After, Test} class ReplicaManagerQuotasTest { val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps(_, new Properties())) val time = new MockTime - val jTime = new JMockTime val metrics = new Metrics val message = new Message("some-data-in-a-message".getBytes()) val topicAndPartition1 = TopicAndPartition("test-topic", 1) @@ -175,7 +173,7 @@ class ReplicaManagerQuotasTest { expect(logManager.getLog(anyObject())).andReturn(Some(log)).anyTimes() replay(logManager) - replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, zkUtils, scheduler, logManager, + replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower) //create the two replicas http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 243e06e..c6d66ba 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -32,7 +32,6 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.FetchRequest.PartitionData -import org.apache.kafka.common.utils.{MockTime => JMockTime} import org.apache.kafka.common.{Node, TopicPartition} import org.easymock.EasyMock import org.junit.Assert.{assertEquals, assertTrue} @@ -44,8 +43,7 @@ import scala.collection.Map class ReplicaManagerTest { val topic = "test-topic" - val time = new MockTime() - val jTime = new JMockTime + val time = new MockTime val metrics = new Metrics var zkClient : ZkClient = _ var zkUtils : ZkUtils = _ @@ -66,7 +64,7 @@ class ReplicaManagerTest { val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) val config = KafkaConfig.fromProps(props) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) - val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr, + val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower) try { val partition = rm.getOrCreatePartition(topic, 1) @@ -84,7 +82,7 @@ class ReplicaManagerTest { props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) val config = KafkaConfig.fromProps(props) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) - val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr, + val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower) try { val partition = rm.getOrCreatePartition(topic, 1) @@ -101,7 +99,7 @@ class ReplicaManagerTest { val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) val config = KafkaConfig.fromProps(props) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) - val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr, + val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, Option(this.getClass.getName)) try { def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = { @@ -126,7 +124,7 @@ class ReplicaManagerTest { props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) val config = KafkaConfig.fromProps(props) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) - val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr, + val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower) try { @@ -147,8 +145,8 @@ class ReplicaManagerTest { EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes() EasyMock.replay(metadataCache) - val brokerList : java.util.List[Integer] = Seq[Integer](0, 1).asJava - val brokerSet : java.util.Set[Integer] = Set[Integer](0, 1).asJava + val brokerList: java.util.List[Integer] = Seq[Integer](0, 1).asJava + val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1).asJava val partition = rm.getOrCreatePartition(topic, 0) partition.getOrCreateReplica(0) @@ -197,7 +195,7 @@ class ReplicaManagerTest { props.put("broker.id", Int.box(0)) val config = KafkaConfig.fromProps(props) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) - val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr, + val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, Option(this.getClass.getName)) try { val aliveBrokers = Seq(new Broker(0, "host0", 0), new Broker(1, "host1", 1), new Broker(1, "host2", 2)) http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 340b05f..b1ebeee 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -19,17 +19,16 @@ package kafka.server import kafka.api._ import kafka.utils._ import kafka.cluster.Replica +import kafka.common.TopicAndPartition import kafka.log.Log import kafka.message.{ByteBufferMessageSet, Message, MessageSet} import kafka.server.QuotaFactory.UnboundedQuota import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.utils.{MockTime => JMockTime} import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.junit.{After, Before, Test} import java.util.Properties import java.util.concurrent.atomic.AtomicBoolean -import kafka.common.TopicAndPartition import org.apache.kafka.common.TopicPartition import org.easymock.EasyMock import org.junit.Assert._ @@ -48,7 +47,6 @@ class SimpleFetchTest { // set the replica manager with the partition val time = new MockTime - val jTime = new JMockTime val metrics = new Metrics val leaderLEO = 20L val followerLEO = 15L @@ -98,7 +96,7 @@ class SimpleFetchTest { EasyMock.replay(logManager) // create the replica manager - replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, zkUtils, scheduler, logManager, + replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower) // add the partition with two replicas, both in ISR http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/utils/MockScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala index e9dbbb1..98ad644 100644 --- a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala +++ b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala @@ -19,6 +19,8 @@ package kafka.utils import scala.collection.mutable.PriorityQueue import java.util.concurrent.TimeUnit +import org.apache.kafka.common.utils.Time + /** * A mock scheduler that executes tasks synchronously using a mock time instance. Tasks are executed synchronously when * the time is advanced. This class is meant to be used in conjunction with MockTime. http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/utils/MockTime.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/MockTime.scala b/core/src/test/scala/unit/kafka/utils/MockTime.scala index 21fb4d9..2d83d65 100644 --- a/core/src/test/scala/unit/kafka/utils/MockTime.scala +++ b/core/src/test/scala/unit/kafka/utils/MockTime.scala @@ -17,45 +17,24 @@ package kafka.utils -import java.util.concurrent._ - -import org.apache.kafka.common.utils +import org.apache.kafka.common.utils.{MockTime => JMockTime} /** * A class used for unit testing things which depend on the Time interface. - * - * This class never manually advances the clock, it only does so when you call - * sleep(ms) - * - * It also comes with an associated scheduler instance for managing background tasks in - * a deterministic way. + * There a couple of difference between this class and `org.apache.kafka.common.utils.MockTime`: + * + * 1. This has an associated scheduler instance for managing background tasks in a deterministic way. + * 2. This doesn't support the `auto-tick` functionality as it interacts badly with the current implementation of `MockScheduler`. */ -class MockTime(@volatile private var currentMs: Long) extends Time { - - val scheduler = new MockScheduler(this) - - def this() = this(System.currentTimeMillis) - - def milliseconds: Long = currentMs +class MockTime(currentTimeMs: Long, currentHiResTimeNs: Long) extends JMockTime(0, currentTimeMs, currentHiResTimeNs) { - def nanoseconds: Long = - TimeUnit.NANOSECONDS.convert(currentMs, TimeUnit.MILLISECONDS) + def this() = this(System.currentTimeMillis(), System.nanoTime()) - def sleep(ms: Long) { - this.currentMs += ms + val scheduler = new MockScheduler(this) + + override def sleep(ms: Long) { + super.sleep(ms) scheduler.tick() } - - override def toString = "MockTime(%d)".format(milliseconds) } - -object MockTime { - implicit def toCommonTime(time: MockTime): utils.Time = new utils.Time { - override def nanoseconds(): Long = time.nanoseconds - - override def milliseconds(): Long = time.milliseconds - - override def sleep(ms: Long): Unit = time.sleep(ms) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index b45600d..33ab58c 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -25,10 +25,12 @@ import java.util.{Properties, Random} import java.security.cert.X509Certificate import javax.net.ssl.X509TrustManager import charset.Charset + import kafka.security.auth.{Acl, Authorizer, Resource} import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.utils.Utils._ import org.apache.kafka.test.TestSslUtils + import scala.collection.mutable.{ArrayBuffer, ListBuffer} import kafka.server._ import kafka.producer._ @@ -47,6 +49,7 @@ import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor} import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.network.Mode import org.apache.kafka.common.serialization.{ByteArraySerializer, Serializer} +import org.apache.kafka.common.utils.Time import org.apache.kafka.test.{TestUtils => JTestUtils} import scala.collection.Map @@ -114,7 +117,7 @@ object TestUtils extends Logging { * * @param config The configuration of the server */ - def createServer(config: KafkaConfig, time: Time = SystemTime): KafkaServer = { + def createServer(config: KafkaConfig, time: Time = Time.SYSTEM): KafkaServer = { val server = new KafkaServer(config, time) server.startup() server http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index df6da21..dc6907f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; @@ -242,7 +241,8 @@ public class KafkaStreams { */ public KafkaStreams(final TopologyBuilder builder, final StreamsConfig config, final KafkaClientSupplier clientSupplier) { // create the metrics - final Time time = new SystemTime(); + final Time time = Time.SYSTEM; + processId = UUID.randomUUID(); this.config = config; http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 798c097..2e16af2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.ProcessorContext; @@ -57,7 +56,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> { public MeteredKeyValueStore(final KeyValueStore<K, V> inner, String metricScope, Time time) { this.inner = inner; this.metricScope = metricScope; - this.time = time != null ? time : new SystemTime(); + this.time = time != null ? time : Time.SYSTEM; } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 09952a3..6533460 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.KeyValue; @@ -43,7 +42,7 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> { public MeteredWindowStore(final WindowStore<K, V> inner, String metricScope, Time time) { this.inner = inner; this.metricScope = metricScope; - this.time = time != null ? time : new SystemTime(); + this.time = time != null ? time : Time.SYSTEM; } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 2d87c16..111a271 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.KafkaStreams; @@ -145,7 +144,7 @@ public class RegexSourceIntegrationTest { final TestStreamThread testStreamThread = new TestStreamThread(builder, streamsConfig, new DefaultKafkaClientSupplier(), - originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), new SystemTime()); + originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), Time.SYSTEM); final TestCondition oneTopicAdded = new TestCondition() { @Override @@ -200,7 +199,7 @@ public class RegexSourceIntegrationTest { final TestStreamThread testStreamThread = new TestStreamThread(builder, streamsConfig, new DefaultKafkaClientSupplier(), - originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), new SystemTime()); + originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), Time.SYSTEM); streamThreads[0] = testStreamThread; @@ -308,7 +307,7 @@ public class RegexSourceIntegrationTest { final TestStreamThread leaderTestStreamThread = new TestStreamThread(builderLeader, streamsConfig, new DefaultKafkaClientSupplier(), - originalLeaderThread.applicationId, originalLeaderThread.clientId, originalLeaderThread.processId, new Metrics(), new SystemTime()); + originalLeaderThread.applicationId, originalLeaderThread.clientId, originalLeaderThread.processId, new Metrics(), Time.SYSTEM); leaderStreamThreads[0] = leaderTestStreamThread; @@ -328,7 +327,7 @@ public class RegexSourceIntegrationTest { final TestStreamThread followerTestStreamThread = new TestStreamThread(builderFollower, streamsConfig, new DefaultKafkaClientSupplier(), - originalFollowerThread.applicationId, originalFollowerThread.clientId, originalFollowerThread.processId, new Metrics(), new SystemTime()); + originalFollowerThread.applicationId, originalFollowerThread.clientId, originalFollowerThread.processId, new Metrics(), Time.SYSTEM); followerStreamThreads[0] = followerTestStreamThread; http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index 2a1f753..303e3f8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.integration; import kafka.admin.AdminClient; +import kafka.server.KafkaConfig$; import kafka.tools.StreamsResetter; import kafka.utils.MockTime; import kafka.utils.ZkUtils; @@ -63,9 +64,17 @@ import static org.hamcrest.MatcherAssert.assertThat; */ public class ResetIntegrationTest { private static final int NUM_BROKERS = 1; + @ClassRule - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); - private final MockTime mockTime = CLUSTER.time; + public static final EmbeddedKafkaCluster CLUSTER; + static { + final Properties props = new Properties(); + // we double the value passed to `time.sleep` in each iteration in one of the map functions, so we disable + // expiration of connections by the brokers to avoid errors when `AdminClient` sends requests after potentially + // very long sleep times + props.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L); + CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props); + } private static final String APP_ID = "cleanup-integration-test"; private static final String INPUT_TOPIC = "inputTopic"; @@ -76,12 +85,14 @@ public class ResetIntegrationTest { private static final long STREAMS_CONSUMER_TIMEOUT = 2000L; private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L; - private static final int TIMEOUT_MULTIPLYER = 5; + private static final int TIMEOUT_MULTIPLIER = 5; - private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new WaitUntilConsumerGroupGotClosed(); private static int testNo = 0; private static AdminClient adminClient = null; + private final MockTime mockTime = CLUSTER.time; + private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new WaitUntilConsumerGroupGotClosed(); + @BeforeClass public static void startKafkaCluster() throws Exception { CLUSTER.createTopic(INPUT_TOPIC); @@ -111,8 +122,8 @@ public class ResetIntegrationTest { Thread.sleep(50); try { - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT, - "Test consumer group active even after waiting " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Test consumer group active even after waiting " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); } catch (GroupCoordinatorNotAvailableException e) { continue; } catch (IllegalArgumentException e) { @@ -154,15 +165,15 @@ public class ResetIntegrationTest { ).get(0); streams.close(); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT, - "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, + "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); // RESET streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfiguration); streams.cleanUp(); cleanGlobal(INTERMEDIATE_USER_TOPIC); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC); @@ -183,8 +194,8 @@ public class ResetIntegrationTest { assertThat(resultRerun, equalTo(result)); assertThat(resultRerun2, equalTo(result2)); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); cleanGlobal(INTERMEDIATE_USER_TOPIC); CLUSTER.deleteTopic(INTERMEDIATE_USER_TOPIC); @@ -227,15 +238,15 @@ public class ResetIntegrationTest { 60000); streams.close(); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT, - "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, + "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); // RESET streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); streams.cleanUp(); cleanGlobal(null); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); assertInternalTopicsGotDeleted(null); @@ -250,8 +261,8 @@ public class ResetIntegrationTest { assertThat(resultRerun, equalTo(result)); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); cleanGlobal(null); } http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index fc28ad5..9bef23e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -36,29 +36,35 @@ public class EmbeddedKafkaCluster extends ExternalResource { private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected private EmbeddedZookeeper zookeeper = null; private final KafkaEmbedded[] brokers; + private final Properties brokerConfig; public EmbeddedKafkaCluster(final int numBrokers) { + this(numBrokers, new Properties()); + } + + public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) { brokers = new KafkaEmbedded[numBrokers]; + this.brokerConfig = brokerConfig; } - public MockTime time = new MockTime(); + public final MockTime time = new MockTime(); /** * Creates and starts a Kafka cluster. */ public void start() throws IOException, InterruptedException { - final Properties brokerConfig = new Properties(); - log.debug("Initiating embedded Kafka cluster startup"); log.debug("Starting a ZooKeeper instance"); zookeeper = new EmbeddedZookeeper(); log.debug("ZooKeeper instance is running at {}", zKConnectString()); + brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString()); brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT); - brokerConfig.put(KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true); - brokerConfig.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L); - brokerConfig.put(KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0); - brokerConfig.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), false); + + putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true); + putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L); + putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0); + putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), false); for (int i = 0; i < brokers.length; i++) { brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i); @@ -70,6 +76,11 @@ public class EmbeddedKafkaCluster extends ExternalResource { } } + private void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) { + if (!props.containsKey(propertyKey)) + brokerConfig.put(propertyKey, propertyValue); + } + /** * Stop the Kafka cluster. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 117e6ff..fe20225 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.integration.utils; -import kafka.utils.Time; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -25,6 +24,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig;