Repository: kafka Updated Branches: refs/heads/trunk 55225bd55 -> 756ec494d
KAFKA-3682; ArrayIndexOutOfBoundsException thrown by SkimpyOffsetMap.get() when full Limited number of attempts to number of map slots after the internal positionOf() goes into linear search mode. Added unit test Co-developed with mimaison Author: edoardo <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #1352 from edoardocomar/KAFKA-3682 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/756ec494 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/756ec494 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/756ec494 Branch: refs/heads/trunk Commit: 756ec494da75c49e6b537bd094b941c0492ec46e Parents: 55225bd Author: edoardo <[email protected]> Authored: Fri May 27 15:39:07 2016 -0700 Committer: Jun Rao <[email protected]> Committed: Fri May 27 15:39:07 2016 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/OffsetMap.scala | 5 +++++ .../src/test/scala/unit/kafka/log/OffsetMapTest.scala | 14 +++++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/756ec494/core/src/main/scala/kafka/log/OffsetMap.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/OffsetMap.scala b/core/src/main/scala/kafka/log/OffsetMap.scala index 3893b2c..f453030 100755 --- a/core/src/main/scala/kafka/log/OffsetMap.scala +++ b/core/src/main/scala/kafka/log/OffsetMap.scala @@ -118,7 +118,12 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend // search for the hash of this key by repeated probing until we find the hash we are looking for or we find an empty slot var attempt = 0 var pos = 0 + //we need to guard against attempt integer overflow if the map is full + //limit attempt to number of slots once positionOf(..) enters linear search mode + val maxAttempts = slots + hashSize - 4 do { + if(attempt >= maxAttempts) + return -1L pos = positionOf(hash1, attempt) bytes.position(pos) if(isEmpty(pos)) http://git-wip-us.apache.org/repos/asf/kafka/blob/756ec494/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala index f50daa4..a5bec17 100644 --- a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala +++ b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala @@ -44,7 +44,19 @@ class OffsetMapTest extends JUnitSuite { assertEquals(map.get(key(i)), -1L) } - def key(key: Int) = ByteBuffer.wrap(key.toString.getBytes) + @Test + def testGetWhenFull() { + val map = new SkimpyOffsetMap(4096) + var i = 37L //any value would do + while (map.size < map.slots) { + map.put(key(i), i) + i = i + 1L + } + assertEquals(map.get(key(i)), -1L) + assertEquals(map.get(key(i-1L)), i-1L) + } + + def key(key: Long) = ByteBuffer.wrap(key.toString.getBytes) def validateMap(items: Int, loadFactor: Double = 0.5): SkimpyOffsetMap = { val map = new SkimpyOffsetMap((items/loadFactor * 24).toInt)
