[ https://issues.apache.org/jira/browse/KAFKA-6917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481159#comment-16481159 ]
ASF GitHub Bot commented on KAFKA-6917: --------------------------------------- hachikuji closed pull request #5036: KAFKA-6917: Process txn completion asynchronously to avoid deadlock URL: https://github.com/apache/kafka/pull/5036 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index cbbd91396b2..9748e174c78 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -81,8 +81,7 @@ class GroupCoordinator(val brokerId: Int, */ def startup(enableMetadataExpiration: Boolean = true) { info("Starting up.") - if (enableMetadataExpiration) - groupManager.enableMetadataExpiration() + groupManager.startup(enableMetadataExpiration) isActive.set(true) info("Startup complete.") } @@ -485,12 +484,12 @@ class GroupCoordinator(val brokerId: Int, } } - def handleTxnCompletion(producerId: Long, - offsetsPartitions: Iterable[TopicPartition], - transactionResult: TransactionResult) { + def scheduleHandleTxnCompletion(producerId: Long, + offsetsPartitions: Iterable[TopicPartition], + transactionResult: TransactionResult) { require(offsetsPartitions.forall(_.topic == Topic.GROUP_METADATA_TOPIC_NAME)) val isCommit = transactionResult == TransactionResult.COMMIT - groupManager.handleTxnCompletion(producerId, offsetsPartitions.map(_.partition).toSet, isCommit) + groupManager.scheduleHandleTxnCompletion(producerId, offsetsPartitions.map(_.partition).toSet, isCommit) } private def doCommitOffsets(group: GroupMetadata, diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 81ce8d512e5..c31735b75c8 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -135,13 +135,14 @@ class GroupMetadataManager(brokerId: Int, }) }) - def enableMetadataExpiration() { + def startup(enableMetadataExpiration: Boolean) { scheduler.startup() - - scheduler.schedule(name = "delete-expired-group-metadata", - fun = cleanupGroupMetadata, - period = config.offsetsRetentionCheckIntervalMs, - unit = TimeUnit.MILLISECONDS) + if (enableMetadataExpiration) { + scheduler.schedule(name = "delete-expired-group-metadata", + fun = cleanupGroupMetadata, + period = config.offsetsRetentionCheckIntervalMs, + unit = TimeUnit.MILLISECONDS) + } } def currentGroups: Iterable[GroupMetadata] = groupMetadataCache.values @@ -793,7 +794,20 @@ class GroupMetadataManager(brokerId: Int, offsetsRemoved } - def handleTxnCompletion(producerId: Long, completedPartitions: Set[Int], isCommit: Boolean) { + /** + * Complete pending transactional offset commits of the groups of `producerId` from the provided + * `completedPartitions`. This method is invoked when a commit or abort marker is fully written + * to the log. It may be invoked when a group lock is held by the caller, for instance when delayed + * operations are completed while appending offsets for a group. Since we need to acquire one or + * more group metadata locks to handle transaction completion, this operation is scheduled on + * the scheduler thread to avoid deadlocks. + */ + def scheduleHandleTxnCompletion(producerId: Long, completedPartitions: Set[Int], isCommit: Boolean): Unit = { + scheduler.schedule(s"handleTxnCompletion-$producerId", () => + handleTxnCompletion(producerId, completedPartitions, isCommit)) + } + + private[group] def handleTxnCompletion(producerId: Long, completedPartitions: Set[Int], isCommit: Boolean): Unit = { val pendingGroups = groupsBelongingToPartitions(producerId, completedPartitions) pendingGroups.foreach { case (groupId) => getGroup(groupId) match { @@ -802,7 +816,7 @@ class GroupMetadataManager(brokerId: Int, group.completePendingTxnOffsetCommit(producerId, isCommit) removeProducerGroup(producerId, groupId) } - } + } case _ => info(s"Group $groupId has moved away from $brokerId after transaction marker was written but before the " + s"cache was updated. The cache on the new group owner will be updated instead.") diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 828b08bfe1d..5532bf1e9a8 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1630,7 +1630,7 @@ class KafkaApis(val requestChannel: RequestChannel, // as soon as the end transaction marker has been written for a transactional offset commit, // call to the group coordinator to materialize the offsets into the cache try { - groupCoordinator.handleTxnCompletion(producerId, successfulOffsetsPartitions, result) + groupCoordinator.scheduleHandleTxnCompletion(producerId, successfulOffsetsPartitions, result) } catch { case e: Exception => error(s"Received an exception while trying to update the offsets cache on transaction marker append", e) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala index 44e13560b00..befd22ac69e 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala @@ -27,7 +27,7 @@ import kafka.server.{ DelayedOperationPurgatory, KafkaConfig } import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.{ JoinGroupRequest, TransactionResult } +import org.apache.kafka.common.requests.JoinGroupRequest import org.easymock.EasyMock import org.junit.Assert._ import org.junit.{ After, Before, Test } @@ -117,7 +117,6 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest verifyConcurrentRandomSequences(createGroupMembers, allOperationsWithTxn) } - abstract class GroupOperation[R, C] extends Operation { val responseFutures = new ConcurrentHashMap[GroupMember, Future[R]]() @@ -228,8 +227,17 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest val offsets = immutable.Map(tp -> OffsetAndMetadata(1)) val producerId = 1000L val producerEpoch : Short = 2 + // When transaction offsets are appended to the log, transactions may be scheduled for + // completion. Since group metadata locks are acquired for transaction completion, include + // this in the callback to test that there are no deadlocks. + def callbackWithTxnCompletion(errors: Map[TopicPartition, Errors]): Unit = { + val offsetsPartitions = (0 to numPartitions).map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, _)) + groupCoordinator.groupManager.scheduleHandleTxnCompletion(producerId, + offsetsPartitions.map(_.partition).toSet, isCommit = random.nextBoolean) + responseCallback(errors) + } groupCoordinator.handleTxnCommitOffsets(member.group.groupId, - producerId, producerEpoch, offsets, responseCallback) + producerId, producerEpoch, offsets, callbackWithTxnCompletion) } } @@ -241,19 +249,14 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest override def runWithCallback(member: GroupMember, responseCallback: CompleteTxnCallback): Unit = { val producerId = 1000L val offsetsPartitions = (0 to numPartitions).map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, _)) - groupCoordinator.handleTxnCompletion(producerId, offsetsPartitions, transactionResult(member.group.groupId)) + groupCoordinator.groupManager.handleTxnCompletion(producerId, + offsetsPartitions.map(_.partition).toSet, isCommit = random.nextBoolean) responseCallback(Errors.NONE) } override def awaitAndVerify(member: GroupMember): Unit = { val error = await(member, 500) assertEquals(Errors.NONE, error) } - // Test both commit and abort. Group ids used in the test have the format <prefix><index> - // Use the last digit of the index to decide between commit and abort. - private def transactionResult(groupId: String): TransactionResult = { - val lastDigit = groupId(groupId.length - 1).toInt - if (lastDigit % 2 == 0) TransactionResult.COMMIT else TransactionResult.ABORT - } } class LeaveGroupOperation extends GroupOperation[LeaveGroupCallbackParams, LeaveGroupCallback] { @@ -273,7 +276,6 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest object GroupCoordinatorConcurrencyTest { - type JoinGroupCallback = JoinGroupResult => Unit type SyncGroupCallbackParams = (Array[Byte], Errors) type SyncGroupCallback = (Array[Byte], Errors) => Unit diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 933e91bfcec..608d7cc997f 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -935,7 +935,7 @@ class GroupCoordinatorTest extends JUnitSuite { val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) // Send commit marker. - groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT) + handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT) // Validate that committed offset is materialized. val (secondReqError, secondReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) @@ -960,7 +960,7 @@ class GroupCoordinatorTest extends JUnitSuite { val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) // Validate that the pending commit is discarded. - groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.ABORT) + handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.ABORT) val (secondReqError, secondReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) assertEquals(Errors.NONE, secondReqError) @@ -982,14 +982,14 @@ class GroupCoordinatorTest extends JUnitSuite { assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tp).map(_.offset)) val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) - groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.ABORT) + handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.ABORT) val (secondReqError, secondReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) assertEquals(Errors.NONE, secondReqError) assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), secondReqPartitionData.get(tp).map(_.offset)) // Ignore spurious commit. - groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT) + handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT) val (thirdReqError, thirdReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) assertEquals(Errors.NONE, thirdReqError) @@ -1026,7 +1026,7 @@ class GroupCoordinatorTest extends JUnitSuite { assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1))) // We got a commit for only one __consumer_offsets partition. We should only materialize it's group offsets. - groupCoordinator.handleTxnCompletion(producerId, List(offsetTopicPartitions(0)), TransactionResult.COMMIT) + handleTxnCompletion(producerId, List(offsetTopicPartitions(0)), TransactionResult.COMMIT) groupCoordinator.handleFetchOffsets(groupIds(0), Some(partitions)) match { case (error, partData) => errors.append(error) @@ -1052,7 +1052,7 @@ class GroupCoordinatorTest extends JUnitSuite { assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(1).get(partitions(1)).map(_.offset)) // Now we receive the other marker. - groupCoordinator.handleTxnCompletion(producerId, List(offsetTopicPartitions(1)), TransactionResult.COMMIT) + handleTxnCompletion(producerId, List(offsetTopicPartitions(1)), TransactionResult.COMMIT) errors.clear() partitionData.clear() groupCoordinator.handleFetchOffsets(groupIds(0), Some(partitions)) match { @@ -1102,7 +1102,7 @@ class GroupCoordinatorTest extends JUnitSuite { assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1))) // producer0 commits its transaction. - groupCoordinator.handleTxnCompletion(producerIds(0), List(offsetTopicPartition), TransactionResult.COMMIT) + handleTxnCompletion(producerIds(0), List(offsetTopicPartition), TransactionResult.COMMIT) groupCoordinator.handleFetchOffsets(groupId, Some(partitions)) match { case (error, partData) => errors.append(error) @@ -1117,7 +1117,7 @@ class GroupCoordinatorTest extends JUnitSuite { assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(0).get(partitions(1)).map(_.offset)) // producer1 now commits its transaction. - groupCoordinator.handleTxnCompletion(producerIds(1), List(offsetTopicPartition), TransactionResult.COMMIT) + handleTxnCompletion(producerIds(1), List(offsetTopicPartition), TransactionResult.COMMIT) groupCoordinator.handleFetchOffsets(groupId, Some(partitions)) match { case (error, partData) => @@ -1723,4 +1723,11 @@ class GroupCoordinatorTest extends JUnitSuite { Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) } + def handleTxnCompletion(producerId: Long, + offsetsPartitions: Iterable[TopicPartition], + transactionResult: TransactionResult): Unit = { + val isCommit = transactionResult == TransactionResult.COMMIT + groupCoordinator.groupManager.handleTxnCompletion(producerId, offsetsPartitions.map(_.partition).toSet, isCommit) + } + } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Request handler deadlocks attempting to acquire group metadata lock > ------------------------------------------------------------------- > > Key: KAFKA-6917 > URL: https://issues.apache.org/jira/browse/KAFKA-6917 > Project: Kafka > Issue Type: Task > Components: core > Affects Versions: 1.1.0, 1.0.1 > Reporter: Rajini Sivaram > Assignee: Rajini Sivaram > Priority: Blocker > Fix For: 2.0.0, 1.0.2, 1.1.1 > > > We have noticed another deadlock with the group metadata lock with version > 1.1. > {quote} > Found one Java-level deadlock: > ============================= > "executor-Heartbeat": > waiting for ownable synchronizer 0x00000005ce477080, (a > java.util.concurrent.locks.ReentrantLock$NonfairSync), > which is held by "kafka-request-handler-3" > "kafka-request-handler-3": > waiting for ownable synchronizer 0x00000005cbe7f698, (a > java.util.concurrent.locks.ReentrantLock$NonfairSync), > which is held by "kafka-request-handler-4" > "kafka-request-handler-4": > waiting for ownable synchronizer 0x00000005ce477080, (a > java.util.concurrent.locks.ReentrantLock$NonfairSync), > which is held by "kafka-request-handler-3" > Java stack information for the threads listed above: > =================================================== > "executor-Heartbeat": > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000005ce477080> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) > at > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) > at > kafka.coordinator.group.GroupCoordinator.onExpireHeartbeat(GroupCoordinator.scala:833) > at > kafka.coordinator.group.DelayedHeartbeat.onExpiration(DelayedHeartbeat.scala:34) > at kafka.server.DelayedOperation.run(DelayedOperation.scala:144) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > "kafka-request-handler-3": > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000005cbe7f698> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) > at > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:801) > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:799) > at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) > at > kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:799) > at > kafka.coordinator.group.GroupCoordinator.handleTxnCompletion(GroupCoordinator.scala:496) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$maybeSendResponseCallback$1(KafkaApis.scala:1633) > at > kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$19.apply(KafkaApis.scala:1691) > at > kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$19.apply(KafkaApis.scala:1691) > at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129) > at > kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70) > at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:111) > at > kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:371) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:277) > at > kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:294) > at > kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:498) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:592) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:744) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:728) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:728) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:469) > at > kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:285) > at > kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:428) > at > kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply$mcV$sp(GroupCoordinator.scala:512) > at > kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:507) > at > kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:507) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) > at > kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:506) > at > kafka.coordinator.group.GroupCoordinator.handleTxnCommitOffsets(GroupCoordinator.scala:460) > at > kafka.server.KafkaApis.handleTxnOffsetCommitRequest(KafkaApis.scala:1833) > at kafka.server.KafkaApis.handle(KafkaApis.scala:139) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:748) > "kafka-request-handler-4": > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000005ce477080> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) > at > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:801) > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:799) > at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) > at > kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:799) > at > kafka.coordinator.group.GroupCoordinator.handleTxnCompletion(GroupCoordinator.scala:496) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$maybeSendResponseCallback$1(KafkaApis.scala:1633) > at > kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$19.apply(KafkaApis.scala:1691) > at > kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$19.apply(KafkaApis.scala:1691) > at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129) > at > kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70) > at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:111) > at > kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:371) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:277) > at > kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:294) > at > kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:498) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:592) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:744) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:728) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:728) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:469) > at > kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:285) > at > kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:428) > at > kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply$mcV$sp(GroupCoordinator.scala:512) > at > kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:507) > at > kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:507) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) > at > kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:506) > at > kafka.coordinator.group.GroupCoordinator.handleTxnCommitOffsets(GroupCoordinator.scala:460) > at > kafka.server.KafkaApis.handleTxnOffsetCommitRequest(KafkaApis.scala:1833) > at kafka.server.KafkaApis.handle(KafkaApis.scala:139) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:748) > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)