Repository: kafka Updated Branches: refs/heads/trunk 7d9d1cb23 -> f62db5dd8
MINOR: Follow-up from KAFKA-2720 with comment/style fixes Author: Jason Gustafson <[email protected]> Reviewers: Guozhang Wang <[email protected]>, Ismael Juma <[email protected]> Closes #1513 from hachikuji/followup-for-kafka-2720 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f62db5dd Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f62db5dd Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f62db5dd Branch: refs/heads/trunk Commit: f62db5dd8886f467e1de7e534a9ef3a839a08e84 Parents: 7d9d1cb Author: Jason Gustafson <[email protected]> Authored: Fri Jun 17 00:02:57 2016 +0200 Committer: Ismael Juma <[email protected]> Committed: Fri Jun 17 00:02:57 2016 +0200 ---------------------------------------------------------------------- .../src/main/scala/kafka/coordinator/GroupCoordinator.scala | 8 +++++--- .../main/scala/kafka/coordinator/GroupMetadataManager.scala | 9 ++------- 2 files changed, 7 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f62db5dd/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala index 9c75f83..0d02a4c 100644 --- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala @@ -666,11 +666,13 @@ class GroupCoordinator(val brokerId: Int, group.initNextGeneration() if (group.is(Empty)) { info(s"Group ${group.groupId} with generation ${group.generationId} is now empty") + delayedStore = Some(groupManager.prepareStoreGroup(group, Map.empty, errorCode => { if (errorCode != Errors.NONE.code) { - // we failed to persist the empty group. if we don't retry (which is how - // we handle the situation when a normal rebalance fails, then a coordinator - // change will cause the old generation to come back to life. + // we failed to write the empty group metadata. If the broker fails before another rebalance, + // the previous generation written to the log will become active again (and most likely timeout). + // This should be safe since there are no active members in an empty generation, so we just warn. + warn(s"Failed to write empty metadata for group ${group.groupId}: ${Errors.forCode(errorCode).message()}") } })) } else { http://git-wip-us.apache.org/repos/asf/kafka/blob/f62db5dd/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala index 915c360..e75e23b 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala @@ -515,8 +515,8 @@ class GroupMetadataManager(val brokerId: Int, /** * When this broker becomes a follower for an offsets topic partition clear out the cache for groups that belong to * that partition. - * - * @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache. + * + * @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache. */ def removeGroupsForPartition(offsetsPartition: Int, onGroupUnloaded: GroupMetadata => Unit) { @@ -532,11 +532,6 @@ class GroupMetadataManager(val brokerId: Int, // to prevent coordinator's check-and-get-group race condition ownedPartitions.remove(offsetsPartition) - /** - * NOTE: we need to put this in the loading partition lock as well to prevent race condition of the leader-is-local check - * in getOffsets to protects against fetching from an empty/cleared offset cache (i.e., cleared due to a leader->follower - * transition right after the check and clear the cache), causing offset fetch return empty offsets with NONE error code - */ for (group <- groupMetadataCache.values) { if (partitionFor(group.groupId) == offsetsPartition) { onGroupUnloaded(group)
