This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new dca4304 MINOR: Change "no such session ID" log to debug (#5316)
dca4304 is described below
commit dca4304156373ee5ea5ec5385149425aa7bbc13c
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Sat Jul 21 03:32:56 2018 -0700
MINOR: Change "no such session ID" log to debug (#5316)
Improve the log messages while at it and fix some code style issues.
Reviewers: Ismael Juma <[email protected]>
---
.../src/main/scala/kafka/server/FetchSession.scala | 147 ++++++++++-----------
.../scala/unit/kafka/server/FetchSessionTest.scala | 64 ++++-----
2 files changed, 105 insertions(+), 106 deletions(-)
diff --git a/core/src/main/scala/kafka/server/FetchSession.scala
b/core/src/main/scala/kafka/server/FetchSession.scala
index 68f79ca..64bc773 100644
--- a/core/src/main/scala/kafka/server/FetchSession.scala
+++ b/core/src/main/scala/kafka/server/FetchSession.scala
@@ -92,22 +92,22 @@ class CachedPartition(val topic: String,
this(topic, partition, -1, -1, -1, -1, -1)
def this(part: TopicPartition) =
- this(part.topic(), part.partition())
+ this(part.topic, part.partition)
def this(part: TopicPartition, reqData: FetchRequest.PartitionData) =
- this(part.topic(), part.partition(),
+ this(part.topic, part.partition,
reqData.maxBytes, reqData.fetchOffset, -1,
reqData.logStartOffset, -1)
def this(part: TopicPartition, reqData: FetchRequest.PartitionData,
respData: FetchResponse.PartitionData[Records]) =
- this(part.topic(), part.partition(),
+ this(part.topic, part.partition,
reqData.maxBytes, reqData.fetchOffset, respData.highWatermark,
reqData.logStartOffset, respData.logStartOffset)
- def topicPartition() = new TopicPartition(topic, partition)
+ def topicPartition = new TopicPartition(topic, partition)
- def reqData() = new FetchRequest.PartitionData(fetchOffset,
fetcherLogStartOffset, maxBytes)
+ def reqData = new FetchRequest.PartitionData(fetchOffset,
fetcherLogStartOffset, maxBytes)
def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = {
// Update our cached request parameters.
@@ -129,7 +129,7 @@ class CachedPartition(val topic: String,
def maybeUpdateResponseData(respData: FetchResponse.PartitionData[Records],
updateResponseData: Boolean): Boolean = {
// Check the response data.
var mustRespond = false
- if ((respData.records != null) && (respData.records.sizeInBytes() > 0)) {
+ if ((respData.records != null) && (respData.records.sizeInBytes > 0)) {
// Partitions with new data are always included in the response.
mustRespond = true
}
@@ -143,7 +143,7 @@ class CachedPartition(val topic: String,
if (updateResponseData)
localLogStartOffset = respData.logStartOffset
}
- if (respData.error.code() != 0) {
+ if (respData.error.code != 0) {
// Partitions with errors are always included in the response.
// We also set the cached highWatermark to an invalid offset, -1.
// This ensures that when the error goes away, we re-send the partition.
@@ -154,7 +154,7 @@ class CachedPartition(val topic: String,
mustRespond
}
- override def hashCode() = (31 * partition) + topic.hashCode
+ override def hashCode = (31 * partition) + topic.hashCode
def canEqual(that: Any) = that.isInstanceOf[CachedPartition]
@@ -166,7 +166,7 @@ class CachedPartition(val topic: String,
case _ => false
}
- override def toString() = synchronized {
+ override def toString = synchronized {
"CachedPartition(topic=" + topic +
", partition=" + partition +
", maxBytes=" + maxBytes +
@@ -203,23 +203,23 @@ case class FetchSession(val id: Int,
// If this is -1, the Session is not in the cache.
var cachedSize = -1
- def size(): Int = synchronized {
- partitionMap.size()
+ def size: Int = synchronized {
+ partitionMap.size
}
- def isEmpty(): Boolean = synchronized {
+ def isEmpty: Boolean = synchronized {
partitionMap.isEmpty
}
- def lastUsedKey(): LastUsedKey = synchronized {
+ def lastUsedKey: LastUsedKey = synchronized {
LastUsedKey(lastUsedMs, id)
}
- def evictableKey(): EvictableKey = synchronized {
+ def evictableKey: EvictableKey = synchronized {
EvictableKey(privileged, cachedSize, id)
}
- def metadata(): JFetchMetadata = synchronized { new JFetchMetadata(id,
epoch) }
+ def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) }
def getFetchOffset(topicPartition: TopicPartition): Option[Long] =
synchronized {
Option(partitionMap.find(new
CachedPartition(topicPartition))).map(_.fetchOffset)
@@ -234,7 +234,7 @@ case class FetchSession(val id: Int,
val added = new TL
val updated = new TL
val removed = new TL
- fetchData.entrySet().iterator().asScala.foreach(entry => {
+ fetchData.entrySet.iterator.asScala.foreach(entry => {
val topicPart = entry.getKey
val reqData = entry.getValue
val newCachedPart = new CachedPartition(topicPart, reqData)
@@ -247,18 +247,18 @@ case class FetchSession(val id: Int,
updated.add(topicPart)
}
})
- toForget.iterator().asScala.foreach(p => {
- if (partitionMap.remove(new CachedPartition(p.topic(), p.partition()))) {
+ toForget.iterator.asScala.foreach(p => {
+ if (partitionMap.remove(new CachedPartition(p.topic, p.partition))) {
removed.add(p)
}
})
(added, updated, removed)
}
- override def toString(): String = synchronized {
+ override def toString: String = synchronized {
"FetchSession(id=" + id +
", privileged=" + privileged +
- ", partitionMap.size=" + partitionMap.size() +
+ ", partitionMap.size=" + partitionMap.size +
", creationMs=" + creationMs +
", creationMs=" + lastUsedMs +
", epoch=" + epoch + ")"
@@ -308,7 +308,7 @@ class SessionErrorContext(val error: Errors,
override def foreachPartition(fun: (TopicPartition,
FetchRequest.PartitionData) => Unit): Unit = {}
override def getResponseSize(updates: FetchSession.RESP_MAP, versionId:
Short): Int = {
- FetchResponse.sizeOf(versionId, (new
FetchSession.RESP_MAP).entrySet().iterator())
+ FetchResponse.sizeOf(versionId, (new
FetchSession.RESP_MAP).entrySet.iterator)
}
// Because of the fetch session error, we don't know what partitions were
supposed to be in this request.
@@ -328,15 +328,15 @@ class SessionlessFetchContext(val fetchData:
util.Map[TopicPartition, FetchReque
Option(fetchData.get(part)).map(_.fetchOffset)
override def foreachPartition(fun: (TopicPartition,
FetchRequest.PartitionData) => Unit): Unit = {
- fetchData.entrySet().asScala.foreach(entry => fun(entry.getKey,
entry.getValue))
+ fetchData.entrySet.asScala.foreach(entry => fun(entry.getKey,
entry.getValue))
}
override def getResponseSize(updates: FetchSession.RESP_MAP, versionId:
Short): Int = {
- FetchResponse.sizeOf(versionId, updates.entrySet().iterator())
+ FetchResponse.sizeOf(versionId, updates.entrySet.iterator)
}
override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP):
FetchResponse[Records] = {
- debug(s"Sessionless fetch context returning
${partitionsToLogString(updates.keySet())}")
+ debug(s"Sessionless fetch context returning
${partitionsToLogString(updates.keySet)}")
new FetchResponse(Errors.NONE, updates, 0, INVALID_SESSION_ID)
}
}
@@ -359,17 +359,17 @@ class FullFetchContext(private val time: Time,
Option(fetchData.get(part)).map(_.fetchOffset)
override def foreachPartition(fun: (TopicPartition,
FetchRequest.PartitionData) => Unit): Unit = {
- fetchData.entrySet().asScala.foreach(entry => fun(entry.getKey,
entry.getValue))
+ fetchData.entrySet.asScala.foreach(entry => fun(entry.getKey,
entry.getValue))
}
override def getResponseSize(updates: FetchSession.RESP_MAP, versionId:
Short): Int = {
- FetchResponse.sizeOf(versionId, updates.entrySet().iterator())
+ FetchResponse.sizeOf(versionId, updates.entrySet.iterator)
}
override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP):
FetchResponse[Records] = {
- def createNewSession(): FetchSession.CACHE_MAP = {
- val cachedPartitions = new FetchSession.CACHE_MAP(updates.size())
- updates.entrySet().asScala.foreach(entry => {
+ def createNewSession: FetchSession.CACHE_MAP = {
+ val cachedPartitions = new FetchSession.CACHE_MAP(updates.size)
+ updates.entrySet.asScala.foreach(entry => {
val part = entry.getKey
val respData = entry.getValue
val reqData = fetchData.get(part)
@@ -378,9 +378,9 @@ class FullFetchContext(private val time: Time,
cachedPartitions
}
val responseSessionId = cache.maybeCreateSession(time.milliseconds(),
isFromFollower,
- updates.size(), createNewSession)
+ updates.size, () => createNewSession)
debug(s"Full fetch context with session id $responseSessionId returning " +
- s"${partitionsToLogString(updates.keySet())}")
+ s"${partitionsToLogString(updates.keySet)}")
new FetchResponse(Errors.NONE, updates, 0, responseSessionId)
}
}
@@ -401,8 +401,8 @@ class IncrementalFetchContext(private val time: Time,
override def foreachPartition(fun: (TopicPartition,
FetchRequest.PartitionData) => Unit): Unit = {
// Take the session lock and iterate over all the cached partitions.
session.synchronized {
- session.partitionMap.iterator().asScala.foreach(part => {
- fun(new TopicPartition(part.topic, part.partition), part.reqData())
+ session.partitionMap.iterator.asScala.foreach(part => {
+ fun(new TopicPartition(part.topic, part.partition), part.reqData)
})
}
}
@@ -416,7 +416,7 @@ class IncrementalFetchContext(private val time: Time,
var nextElement: util.Map.Entry[TopicPartition,
FetchResponse.PartitionData[Records]] = null
override def hasNext: Boolean = {
- while ((nextElement == null) && iter.hasNext()) {
+ while ((nextElement == null) && iter.hasNext) {
val element = iter.next()
val topicPart = element.getKey
val respData = element.getValue
@@ -438,23 +438,23 @@ class IncrementalFetchContext(private val time: Time,
}
override def next(): util.Map.Entry[TopicPartition,
FetchResponse.PartitionData[Records]] = {
- if (!hasNext()) throw new NoSuchElementException()
+ if (!hasNext) throw new NoSuchElementException
val element = nextElement
nextElement = null
element
}
- override def remove() = throw new UnsupportedOperationException()
+ override def remove() = throw new UnsupportedOperationException
}
override def getResponseSize(updates: FetchSession.RESP_MAP, versionId:
Short): Int = {
session.synchronized {
- val expectedEpoch = JFetchMetadata.nextEpoch(reqMetadata.epoch())
+ val expectedEpoch = JFetchMetadata.nextEpoch(reqMetadata.epoch)
if (session.epoch != expectedEpoch) {
- FetchResponse.sizeOf(versionId, (new
FetchSession.RESP_MAP).entrySet().iterator())
+ FetchResponse.sizeOf(versionId, (new
FetchSession.RESP_MAP).entrySet.iterator)
} else {
// Pass the partition iterator which updates neither the fetch context
nor the partition map.
- FetchResponse.sizeOf(versionId, new
PartitionIterator(updates.entrySet().iterator(), false))
+ FetchResponse.sizeOf(versionId, new
PartitionIterator(updates.entrySet.iterator, false))
}
}
}
@@ -463,19 +463,19 @@ class IncrementalFetchContext(private val time: Time,
session.synchronized {
// Check to make sure that the session epoch didn't change in between
// creating this fetch context and generating this response.
- val expectedEpoch = JFetchMetadata.nextEpoch(reqMetadata.epoch())
+ val expectedEpoch = JFetchMetadata.nextEpoch(reqMetadata.epoch)
if (session.epoch != expectedEpoch) {
info(s"Incremental fetch session ${session.id} expected epoch
$expectedEpoch, but " +
s"got ${session.epoch}. Possible duplicate request.")
new FetchResponse(Errors.INVALID_FETCH_SESSION_EPOCH, new
FetchSession.RESP_MAP, 0, session.id)
} else {
// Iterate over the update list using PartitionIterator. This will
prune updates which don't need to be sent
- val partitionIter = new
PartitionIterator(updates.entrySet().iterator(), true)
- while (partitionIter.hasNext()) {
+ val partitionIter = new PartitionIterator(updates.entrySet.iterator,
true)
+ while (partitionIter.hasNext) {
partitionIter.next()
}
debug(s"Incremental fetch context with session id ${session.id}
returning " +
- s"${partitionsToLogString(updates.keySet())}")
+ s"${partitionsToLogString(updates.keySet)}")
new FetchResponse(Errors.NONE, updates, 0, session.id)
}
}
@@ -485,7 +485,7 @@ class IncrementalFetchContext(private val time: Time,
session.synchronized {
// Check to make sure that the session epoch didn't change in between
// creating this fetch context and generating this response.
- val expectedEpoch = JFetchMetadata.nextEpoch(reqMetadata.epoch())
+ val expectedEpoch = JFetchMetadata.nextEpoch(reqMetadata.epoch)
if (session.epoch != expectedEpoch) {
info(s"Incremental fetch session ${session.id} expected epoch
$expectedEpoch, but " +
s"got ${session.epoch}. Possible duplicate request.")
@@ -570,14 +570,14 @@ class FetchSessionCache(private val maxEntries: Int,
/**
* Get the number of entries currently in the fetch session cache.
*/
- def size(): Int = synchronized {
+ def size: Int = synchronized {
sessions.size
}
/**
* Get the total number of cached partitions.
*/
- def totalPartitions(): Long = synchronized {
+ def totalPartitions: Long = synchronized {
numPartitions
}
@@ -614,7 +614,7 @@ class FetchSessionCache(private val maxEntries: Int,
val partitionMap = createPartitions()
val session = new FetchSession(newSessionId(), privileged, partitionMap,
now, now, JFetchMetadata.nextEpoch(INITIAL_EPOCH))
- debug(s"Created fetch session ${session.toString()}")
+ debug(s"Created fetch session ${session.toString}")
sessions.put(session.id, session)
touch(session, now)
session.id
@@ -639,12 +639,12 @@ class FetchSessionCache(private val maxEntries: Int,
*/
def tryEvict(privileged: Boolean, key: EvictableKey, now: Long): Boolean =
synchronized {
// Try to evict an entry which is stale.
- val lastUsedEntry = lastUsed.firstEntry()
+ val lastUsedEntry = lastUsed.firstEntry
if (lastUsedEntry == null) {
trace("There are no cache entries to evict.")
false
- } else if (now - lastUsedEntry.getKey().lastUsedMs > evictionMs) {
- val session = lastUsedEntry.getValue()
+ } else if (now - lastUsedEntry.getKey.lastUsedMs > evictionMs) {
+ val session = lastUsedEntry.getValue
trace(s"Evicting stale FetchSession ${session.id}.")
remove(session)
evictionsMeter.mark()
@@ -653,16 +653,16 @@ class FetchSessionCache(private val maxEntries: Int,
// If there are no stale entries, check the first evictable entry.
// If it is less valuable than our proposed entry, evict it.
val map = if (privileged) evictableByPrivileged else evictableByAll
- val evictableEntry = map.firstEntry()
+ val evictableEntry = map.firstEntry
if (evictableEntry == null) {
trace("No evictable entries found.")
false
- } else if (key.compareTo(evictableEntry.getKey()) < 0) {
- trace(s"Can't evict ${evictableEntry.getKey()} with ${key.toString}")
+ } else if (key.compareTo(evictableEntry.getKey) < 0) {
+ trace(s"Can't evict ${evictableEntry.getKey} with ${key.toString}")
false
} else {
- trace(s"Evicting ${evictableEntry.getKey()} with ${key.toString}.")
- remove(evictableEntry.getValue())
+ trace(s"Evicting ${evictableEntry.getKey} with ${key.toString}.")
+ remove(evictableEntry.getValue)
evictionsMeter.mark()
true
}
@@ -685,8 +685,8 @@ class FetchSessionCache(private val maxEntries: Int,
*/
def remove(session: FetchSession): Option[FetchSession] = synchronized {
val evictableKey = session.synchronized {
- lastUsed.remove(session.lastUsedKey())
- session.evictableKey()
+ lastUsed.remove(session.lastUsedKey)
+ session.evictableKey
}
evictableByAll.remove(evictableKey)
evictableByPrivileged.remove(evictableKey)
@@ -706,19 +706,19 @@ class FetchSessionCache(private val maxEntries: Int,
def touch(session: FetchSession, now: Long): Unit = synchronized {
session.synchronized {
// Update the lastUsed map.
- lastUsed.remove(session.lastUsedKey())
+ lastUsed.remove(session.lastUsedKey)
session.lastUsedMs = now
- lastUsed.put(session.lastUsedKey(), session)
+ lastUsed.put(session.lastUsedKey, session)
val oldSize = session.cachedSize
if (oldSize != -1) {
- val oldEvictableKey = session.evictableKey()
+ val oldEvictableKey = session.evictableKey
evictableByPrivileged.remove(oldEvictableKey)
evictableByAll.remove(oldEvictableKey)
numPartitions = numPartitions - oldSize
}
- session.cachedSize = session.size()
- val newEvictableKey = session.evictableKey()
+ session.cachedSize = session.size
+ val newEvictableKey = session.evictableKey
if ((!session.privileged) || (now - session.creationMs > evictionMs)) {
evictableByPrivileged.put(newEvictableKey, session)
}
@@ -738,35 +738,34 @@ class FetchManager(private val time: Time,
isFollower: Boolean): FetchContext = {
val context = if (reqMetadata.isFull) {
var removedFetchSessionStr = ""
- if (reqMetadata.sessionId() != INVALID_SESSION_ID) {
+ if (reqMetadata.sessionId != INVALID_SESSION_ID) {
// Any session specified in a FULL fetch request will be closed.
- if (cache.remove(reqMetadata.sessionId()).isDefined) {
- removedFetchSessionStr = s" Removed fetch session
${reqMetadata.sessionId()}."
+ if (cache.remove(reqMetadata.sessionId).isDefined) {
+ removedFetchSessionStr = s" Removed fetch session
${reqMetadata.sessionId}."
}
}
var suffix = ""
- val context = if (reqMetadata.epoch() == FINAL_EPOCH) {
+ val context = if (reqMetadata.epoch == FINAL_EPOCH) {
// If the epoch is FINAL_EPOCH, don't try to create a new session.
suffix = " Will not try to create a new session."
new SessionlessFetchContext(fetchData)
} else {
new FullFetchContext(time, cache, reqMetadata, fetchData, isFollower)
}
- debug(s"Created a new full FetchContext with
${partitionsToLogString(fetchData.keySet())}."+
+ debug(s"Created a new full FetchContext with
${partitionsToLogString(fetchData.keySet)}."+
s"${removedFetchSessionStr}${suffix}")
context
} else {
cache.synchronized {
- cache.get(reqMetadata.sessionId()) match {
+ cache.get(reqMetadata.sessionId) match {
case None => {
- info(s"Created a new error FetchContext for session id
${reqMetadata.sessionId()}: " +
- "no such session ID found.")
+ debug(s"Session error for ${reqMetadata.sessionId}: no such
session ID found.")
new SessionErrorContext(Errors.FETCH_SESSION_ID_NOT_FOUND,
reqMetadata)
}
case Some(session) => session.synchronized {
- if (session.epoch != reqMetadata.epoch()) {
- debug(s"Created a new error FetchContext for session id
${session.id}: expected " +
- s"epoch ${session.epoch}, but got epoch
${reqMetadata.epoch()}.")
+ if (session.epoch != reqMetadata.epoch) {
+ debug(s"Session error for ${reqMetadata.sessionId}: expected
epoch " +
+ s"${session.epoch}, but got ${reqMetadata.epoch} instead.");
new SessionErrorContext(Errors.INVALID_FETCH_SESSION_EPOCH,
reqMetadata)
} else {
val (added, updated, removed) = session.update(fetchData,
toForget, reqMetadata)
@@ -777,7 +776,7 @@ class FetchManager(private val time: Time,
cache.remove(session)
new SessionlessFetchContext(fetchData)
} else {
- if (session.size() != session.cachedSize) {
+ if (session.size != session.cachedSize) {
// If the number of partitions in the session changed,
update the session's
// position in the cache.
cache.touch(session, session.lastUsedMs)
diff --git a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
index ae001a3..c4a9625 100755
--- a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
@@ -49,7 +49,7 @@ class FetchSessionTest {
assertTrue("Missing session " + i + " out of " + sessionIds.size + "(" +
sessionId + ")",
cache.get(sessionId).isDefined)
}
- assertEquals(sessionIds.size, cache.size())
+ assertEquals(sessionIds.size, cache.size)
}
private def dummyCreate(size: Int)() = {
@@ -63,7 +63,7 @@ class FetchSessionTest {
@Test
def testSessionCache(): Unit = {
val cache = new FetchSessionCache(3, 100)
- assertEquals(0, cache.size())
+ assertEquals(0, cache.size)
val id1 = cache.maybeCreateSession(0, false, 10, dummyCreate(10))
val id2 = cache.maybeCreateSession(10, false, 20, dummyCreate(20))
val id3 = cache.maybeCreateSession(20, false, 30, dummyCreate(30))
@@ -86,44 +86,44 @@ class FetchSessionTest {
@Test
def testResizeCachedSessions(): Unit = {
val cache = new FetchSessionCache(2, 100)
- assertEquals(0, cache.totalPartitions())
- assertEquals(0, cache.size())
- assertEquals(0, cache.evictionsMeter.count())
+ assertEquals(0, cache.totalPartitions)
+ assertEquals(0, cache.size)
+ assertEquals(0, cache.evictionsMeter.count)
val id1 = cache.maybeCreateSession(0, false, 2, dummyCreate(2))
assertTrue(id1 > 0)
assertCacheContains(cache, id1)
val session1 = cache.get(id1).get
- assertEquals(2, session1.size())
- assertEquals(2, cache.totalPartitions())
- assertEquals(1, cache.size())
- assertEquals(0, cache.evictionsMeter.count())
+ assertEquals(2, session1.size)
+ assertEquals(2, cache.totalPartitions)
+ assertEquals(1, cache.size)
+ assertEquals(0, cache.evictionsMeter.count)
val id2 = cache.maybeCreateSession(0, false, 4, dummyCreate(4))
val session2 = cache.get(id2).get
assertTrue(id2 > 0)
assertCacheContains(cache, id1, id2)
- assertEquals(6, cache.totalPartitions())
- assertEquals(2, cache.size())
- assertEquals(0, cache.evictionsMeter.count())
+ assertEquals(6, cache.totalPartitions)
+ assertEquals(2, cache.size)
+ assertEquals(0, cache.evictionsMeter.count)
cache.touch(session1, 200)
cache.touch(session2, 200)
val id3 = cache.maybeCreateSession(200, false, 5, dummyCreate(5))
assertTrue(id3 > 0)
assertCacheContains(cache, id2, id3)
- assertEquals(9, cache.totalPartitions())
- assertEquals(2, cache.size())
- assertEquals(1, cache.evictionsMeter.count())
+ assertEquals(9, cache.totalPartitions)
+ assertEquals(2, cache.size)
+ assertEquals(1, cache.evictionsMeter.count)
cache.remove(id3)
assertCacheContains(cache, id2)
- assertEquals(1, cache.size())
- assertEquals(1, cache.evictionsMeter.count())
- assertEquals(4, cache.totalPartitions())
- val iter = session2.partitionMap.iterator()
+ assertEquals(1, cache.size)
+ assertEquals(1, cache.evictionsMeter.count)
+ assertEquals(4, cache.totalPartitions)
+ val iter = session2.partitionMap.iterator
iter.next()
iter.remove()
- assertEquals(3, session2.size())
+ assertEquals(3, session2.size)
assertEquals(4, session2.cachedSize)
cache.touch(session2, session2.lastUsedMs)
- assertEquals(3, cache.totalPartitions())
+ assertEquals(3, cache.totalPartitions)
}
val EMPTY_PART_LIST = Collections.unmodifiableList(new
util.ArrayList[TopicPartition]())
@@ -220,15 +220,15 @@ class FetchSessionTest {
val context8 = fetchManager.newContext(
new JFetchMetadata(prevSessionId, FINAL_EPOCH), reqData8,
EMPTY_PART_LIST, false)
assertEquals(classOf[SessionlessFetchContext], context8.getClass)
- assertEquals(0, cache.size())
+ assertEquals(0, cache.size)
val respData8 = new util.LinkedHashMap[TopicPartition,
FetchResponse.PartitionData[Records]]
respData8.put(new TopicPartition("bar", 0),
new FetchResponse.PartitionData(Errors.NONE, 100, 100, 100, null,
null))
respData8.put(new TopicPartition("bar", 1),
new FetchResponse.PartitionData(Errors.NONE, 100, 100, 100, null,
null))
val resp8 = context8.updateAndGenerateResponseData(respData8)
- assertEquals(Errors.NONE, resp8.error())
- nextSessionId = resp8.sessionId()
+ assertEquals(Errors.NONE, resp8.error)
+ nextSessionId = resp8.sessionId
} while (nextSessionId == prevSessionId)
}
@@ -277,9 +277,9 @@ class FetchSessionTest {
respData2.put(new TopicPartition("bar", 0), new
FetchResponse.PartitionData(
Errors.NONE, 10, 10, 10, null, null))
val resp2 = context2.updateAndGenerateResponseData(respData2)
- assertEquals(Errors.NONE, resp2.error())
- assertEquals(1, resp2.responseData().size())
- assertTrue(resp2.sessionId() > 0)
+ assertEquals(Errors.NONE, resp2.error)
+ assertEquals(1, resp2.responseData.size)
+ assertTrue(resp2.sessionId > 0)
}
@Test
@@ -300,9 +300,9 @@ class FetchSessionTest {
respData1.put(new TopicPartition("foo", 1), new
FetchResponse.PartitionData(
Errors.NONE, 10, 10, 10, null, null))
val resp1 = context1.updateAndGenerateResponseData(respData1)
- assertEquals(Errors.NONE, resp1.error())
+ assertEquals(Errors.NONE, resp1.error)
assertTrue(resp1.sessionId() != INVALID_SESSION_ID)
- assertEquals(2, resp1.responseData().size())
+ assertEquals(2, resp1.responseData.size)
// Create an incremental fetch request that removes foo-0 and foo-1
// Verify that the previous fetch session was closed.
@@ -311,12 +311,12 @@ class FetchSessionTest {
removed2.add(new TopicPartition("foo", 0))
removed2.add(new TopicPartition("foo", 1))
val context2 = fetchManager.newContext(
- new JFetchMetadata(resp1.sessionId(), 1), reqData2, removed2, false)
+ new JFetchMetadata(resp1.sessionId, 1), reqData2, removed2, false)
assertEquals(classOf[SessionlessFetchContext], context2.getClass)
val respData2 = new util.LinkedHashMap[TopicPartition,
FetchResponse.PartitionData[Records]]
val resp2 = context2.updateAndGenerateResponseData(respData2)
- assertEquals(INVALID_SESSION_ID, resp2.sessionId())
+ assertEquals(INVALID_SESSION_ID, resp2.sessionId)
assertTrue(resp2.responseData().isEmpty)
- assertEquals(0, cache.size())
+ assertEquals(0, cache.size)
}
}