Repository: kafka
Updated Branches:
  refs/heads/0.10.1 9ffadbfdc -> 0e20e5fbf


KAFKA-4399; Fix deadlock between cleanupGroupMetadata and offset commit

Author: Alexey Ozeritsky <aozerit...@yandex-team.ru>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #2125 from resetius/KAFKA-4399

(cherry picked from commit ea370be518a783f3a5d8d834f78c82e36bf968b3)
Signed-off-by: Jason Gustafson <ja...@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0e20e5fb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0e20e5fb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0e20e5fb

Branch: refs/heads/0.10.1
Commit: 0e20e5fbfe683257882f0426357b6c2e75bcacca
Parents: 9ffadbf
Author: Alexey Ozeritsky <aozerit...@yandex-team.ru>
Authored: Thu Dec 1 19:09:06 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Dec 1 19:57:12 2016 -0800

----------------------------------------------------------------------
 .../coordinator/GroupMetadataManager.scala      | 106 +++++++++----------
 1 file changed, 48 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0e20e5fb/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index feedc45..fafc39c 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -136,43 +136,6 @@ class GroupMetadataManager(val brokerId: Int,
     }
   }
 
-  /**
-   * Remove the group from the cache and delete all metadata associated with 
it. This should be
-   * called only after all offsets for the group have expired and no members 
are remaining (i.e.
-   * it is in the Empty state).
-   */
-  private def evictGroupAndDeleteMetadata(group: GroupMetadata) {
-    // guard this removal in case of concurrent access (e.g. if a delayed join 
completes with no members
-    // while the group is being removed due to coordinator emigration). We 
also avoid writing the tombstone
-    // when the generationId is 0, since this group is only using Kafka for 
offset storage.
-    if (groupMetadataCache.remove(group.groupId, group) && group.generationId 
> 0) {
-      // Append the tombstone messages to the partition. It is okay if the 
replicas don't receive these (say,
-      // if we crash or leaders move) since the new leaders will still expire 
the consumers with heartbeat and
-      // retry removing this group.
-      val groupPartition = partitionFor(group.groupId)
-      getMessageFormatVersionAndTimestamp(groupPartition).foreach { case 
(magicValue, timestamp) =>
-        val tombstone = new Message(bytes = null, key = 
GroupMetadataManager.groupMetadataKey(group.groupId),
-          timestamp = timestamp, magicValue = magicValue)
-
-        val partitionOpt = 
replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartition)
-        partitionOpt.foreach { partition =>
-          val appendPartition = 
TopicAndPartition(Topic.GroupMetadataTopicName, groupPartition)
-
-          trace("Marking group %s as deleted.".format(group.groupId))
-
-          try {
-            // do not need to require acks since even if the tombstone is lost,
-            // it will be appended again by the new leader
-            partition.appendMessagesToLeader(new 
ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstone))
-          } catch {
-            case t: Throwable =>
-              error("Failed to mark group %s as deleted in 
%s.".format(group.groupId, appendPartition), t)
-            // ignore and continue
-          }
-        }
-      }
-    }
-  }
 
   def prepareStoreGroup(group: GroupMetadata,
                         groupAssignment: Map[String, Array[Byte]],
@@ -598,7 +561,7 @@ class GroupMetadataManager(val brokerId: Int,
     val startMs = time.milliseconds()
     var offsetsRemoved = 0
 
-    groupMetadataCache.foreach { case (groupId, group) =>
+    val result = groupMetadataCache.flatMap { case (groupId, group) =>
       group synchronized {
         if (!group.is(Dead)) {
           val offsetsPartition = partitionFor(groupId)
@@ -612,32 +575,59 @@ class GroupMetadataManager(val brokerId: Int,
                 new Message(bytes = null, key = commitKey, timestamp = 
timestamp, magicValue = magicValue)
               }.toBuffer
 
-              val partitionOpt = 
replicaManager.getPartition(Topic.GroupMetadataTopicName, offsetsPartition)
-              partitionOpt.foreach { partition =>
-                val appendPartition = 
TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
-                trace("Marked %d offsets in %s for 
deletion.".format(tombstones.size, appendPartition))
-
-                try {
-                  // do not need to require acks since even if the tombstone 
is lost,
-                  // it will be appended again in the next purge cycle
-                  partition.appendMessagesToLeader(new 
ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstones: _*))
-                  offsetsRemoved += tombstones.size
-                }
-                catch {
-                  case t: Throwable =>
-                    error("Failed to mark %d expired offsets for deletion in 
%s.".format(tombstones.size, appendPartition), t)
-                  // ignore and continue
-                }
-              }
+              val numOffsetsExpired = tombstones.size
 
               if (group.is(Empty) && !group.hasOffsets) {
                 group.transitionTo(Dead)
-                evictGroupAndDeleteMetadata(group)
-                info("Group %s generation %s is dead and 
removed".format(group.groupId, group.generationId))
+
+                // We avoid writing the tombstone
+                // when the generationId is 0, since this group is only using 
Kafka for offset storage.
+                if (groupMetadataCache.get(groupId) == group && 
group.generationId > 0) {
+                  // Append the tombstone messages to the partition. It is 
okay if the replicas don't receive these (say,
+                  // if we crash or leaders move) since the new leaders will 
still expire the consumers with heartbeat and
+                  // retry removing this group.
+
+                  trace("Marking group %s as deleted.".format(groupId))
+
+                  tombstones += new Message(bytes = null, key = 
GroupMetadataManager.groupMetadataKey(groupId),
+                    timestamp = timestamp, magicValue = magicValue)
+                }
               }
 
+              Some((group, offsetsPartition, tombstones, numOffsetsExpired))
             case None =>
-              info("BrokerId %d is no longer a coordinator for the group %s. 
Proceeding cleanup for other alive groups".format(brokerId, group.groupId))
+              info("BrokerId %d is no longer a coordinator for the group %s. 
Proceeding cleanup for other alive groups".format(brokerId, groupId))
+              None
+          }
+        } else {
+          None
+        }
+      }
+    }
+
+    for ((group, offsetsPartition, tombstones, numOffsetsExpired) <- result) {
+      val partitionOpt = 
replicaManager.getPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+
+      partitionOpt.foreach { partition =>
+        val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, 
offsetsPartition)
+        trace("Marked %d offsets in %s for 
deletion.".format(numOffsetsExpired, appendPartition))
+
+        try {
+          // do not need to require acks since even if the tombstone is lost,
+          // it will be appended again in the next purge cycle
+          partition.appendMessagesToLeader(new 
ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstones: _*))
+          offsetsRemoved += numOffsetsExpired
+        }
+        catch {
+          case t: Throwable =>
+            error(s"Failed to write ${tombstones.size} tombstones for group 
${group.groupId} to $appendPartition.", t)
+          // ignore and continue
+        }
+
+        group synchronized {
+          if (group.is(Dead)) {
+            groupMetadataCache.remove(group.groupId, group)
+            info("Group %s generation %s is dead and 
removed".format(group.groupId, group.generationId))
           }
         }
       }

Reply via email to