Repository: kafka Updated Branches: refs/heads/0.9.0 c411ec808 -> 7508ec219
KAFKA-2795: fix potential NPE in GroupMetadataManager.addGroup Author: Jason Gustafson <ja...@confluent.io> Reviewers: Onur Karaman, Guozhang Wang Closes #488 from hachikuji/KAFKA-2795 (cherry picked from commit c455e608c1f2c7be6ff0a721f49c1fe3ede0165f) Signed-off-by: Guozhang Wang <wangg...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7508ec21 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7508ec21 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7508ec21 Branch: refs/heads/0.9.0 Commit: 7508ec219022c02aedb401bb01e5fa183b0db5c1 Parents: c411ec8 Author: Jason Gustafson <ja...@confluent.io> Authored: Tue Nov 10 13:06:55 2015 -0800 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Tue Nov 10 13:07:09 2015 -0800 ---------------------------------------------------------------------- .../kafka/coordinator/GroupMetadataManager.scala | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7508ec21/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala index f98fc74..047970e 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala @@ -67,9 +67,6 @@ class GroupMetadataManager(val brokerId: Int, /* lock for expiring stale offsets, it should be always called BEFORE the group lock if needed */ private val offsetExpireLock = new ReentrantReadWriteLock() - /* lock for removing offsets of a range partition, it should be always called BEFORE the group lock if needed */ - private val offsetRemoveLock = new ReentrantReadWriteLock() - /* shutting down flag */ private val shuttingDown = new AtomicBoolean(false) @@ -116,12 +113,12 @@ class GroupMetadataManager(val brokerId: Int, * Add a group or get the group associated with the given groupId if it already exists */ def addGroup(groupId: String, protocolType: String): GroupMetadata = { - addGroup(groupId, new GroupMetadata(groupId, protocolType)) - } - - private def addGroup(groupId: String, group: GroupMetadata): GroupMetadata = { - groupsCache.putIfNotExists(groupId, group) - groupsCache.get(groupId) + val newGroup = new GroupMetadata(groupId, protocolType) + val currentGroup = groupsCache.putIfNotExists(groupId, newGroup) + if (currentGroup != null) + currentGroup + else + newGroup } /**