[ https://issues.apache.org/jira/browse/KAFKA-5970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16212383#comment-16212383 ]
Ben Corlett commented on KAFKA-5970: ------------------------------------ Unfortunately we've had another incident today on broker 125 after applying the changes from pull request 3956. You can see the changes here https://github.com/corlettb/kafka/commits/deadlock. {noformat} Found one Java-level deadlock: ============================= "executor-Heartbeat": waiting to lock monitor 0x00007fbd8c1834c8 (object 0x000000068cccb590, a kafka.coordinator.group.GroupMetadata), which is held by "kafka-request-handler-7" "kafka-request-handler-7": waiting to lock monitor 0x00007fbe1942f698 (object 0x000000068cd2c420, a kafka.coordinator.group.GroupMetadata), which is held by "kafka-request-handler-4" "kafka-request-handler-4": waiting to lock monitor 0x00007fbd8c1834c8 (object 0x000000068cccb590, a kafka.coordinator.group.GroupMetadata), which is held by "kafka-request-handler-7" Java stack information for the threads listed above: =================================================== "executor-Heartbeat": at kafka.coordinator.group.GroupCoordinator.onExpireHeartbeat(GroupCoordinator.scala:776) - waiting to lock <0x000000068cccb590> (a kafka.coordinator.group.GroupMetadata) at kafka.coordinator.group.DelayedHeartbeat.onExpiration(DelayedHeartbeat.scala:34) at kafka.server.DelayedOperation.run(DelayedOperation.scala:120) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) "kafka-request-handler-7": at kafka.coordinator.group.GroupMetadataManager.putCacheCallback$2(GroupMetadataManager.scala:311) - waiting to lock <0x000000068cd2c420> (a kafka.coordinator.group.GroupMetadata) at kafka.coordinator.group.GroupMetadataManager.$anonfun$storeOffsets$10(GroupMetadataManager.scala:380) at kafka.coordinator.group.GroupMetadataManager.$anonfun$storeOffsets$10$adapted(GroupMetadataManager.scala:380) at kafka.coordinator.group.GroupMetadataManager$$Lambda$1045/747223912.apply(Unknown Source) at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:124) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:68) at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:106) at kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:107) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:347) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:253) at kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:250) at kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:418) at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:500) at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:545) at kafka.server.ReplicaManager$$Lambda$909/475609331.apply(Unknown Source) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234) at scala.collection.TraversableLike$$Lambda$14/1859039536.apply(Unknown Source) at scala.collection.immutable.Map$Map1.foreach(Map.scala:120) at scala.collection.TraversableLike.map(TraversableLike.scala:234) at scala.collection.TraversableLike.map$(TraversableLike.scala:227) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:531) at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:373) at kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:245) at kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:380) at kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:465) - locked <0x000000068cccb590> (a kafka.coordinator.group.GroupMetadata) at kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:429) at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:361) at kafka.server.KafkaApis.handle(KafkaApis.scala:105) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66) at java.lang.Thread.run(Thread.java:748) "kafka-request-handler-4": at kafka.coordinator.group.GroupMetadataManager.putCacheCallback$2(GroupMetadataManager.scala:311) - waiting to lock <0x000000068cccb590> (a kafka.coordinator.group.GroupMetadata) at kafka.coordinator.group.GroupMetadataManager.$anonfun$storeOffsets$10(GroupMetadataManager.scala:380) at kafka.coordinator.group.GroupMetadataManager.$anonfun$storeOffsets$10$adapted(GroupMetadataManager.scala:380) at kafka.coordinator.group.GroupMetadataManager$$Lambda$1045/747223912.apply(Unknown Source) at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:124) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:68) at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:106) at kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:107) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:347) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:253) at kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:250) at kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:418) at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:500) at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:545) at kafka.server.ReplicaManager$$Lambda$909/475609331.apply(Unknown Source) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234) at scala.collection.TraversableLike$$Lambda$14/1859039536.apply(Unknown Source) at scala.collection.immutable.Map$Map1.foreach(Map.scala:120) at scala.collection.TraversableLike.map(TraversableLike.scala:234) at scala.collection.TraversableLike.map$(TraversableLike.scala:227) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:531) at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:373) at kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:245) at kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:380) at kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:465) - locked <0x000000068cd2c420> (a kafka.coordinator.group.GroupMetadata) at kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:429) at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:361) at kafka.server.KafkaApis.handle(KafkaApis.scala:105) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66) at java.lang.Thread.run(Thread.java:748) Found 1 deadlock. {noformat} > Deadlock due to locking of DelayedProduce and group > --------------------------------------------------- > > Key: KAFKA-5970 > URL: https://issues.apache.org/jira/browse/KAFKA-5970 > Project: Kafka > Issue Type: Bug > Components: clients > Reporter: Rajini Sivaram > Assignee: Rajini Sivaram > Priority: Blocker > Fix For: 1.0.0, 0.11.0.2 > > Attachments: jstack.txt > > > From a local run of TransactionsBounceTest. Looks like we hold group lock > while completing DelayedProduce, which in turn may acquire group lock. > {quote} > Found one Java-level deadlock: > ============================= > "kafka-request-handler-7": > waiting to lock monitor 0x00007fe08891fb08 (object 0x000000074a9fbc50, a > kafka.coordinator.group.GroupMetadata), > which is held by "kafka-request-handler-4" > "kafka-request-handler-4": > waiting to lock monitor 0x00007fe0869e4408 (object 0x0000000749be7bb8, a > kafka.server.DelayedProduce), > which is held by "kafka-request-handler-3" > "kafka-request-handler-3": > waiting to lock monitor 0x00007fe08891fb08 (object 0x000000074a9fbc50, a > kafka.coordinator.group.GroupMetadata), > which is held by "kafka-request-handler-4" > Java stack information for the threads listed above: > =================================================== > "kafka-request-handler-7": > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:752) > waiting to lock <0x000000074a9fbc50> (a > kafka.coordinator.group.GroupMetadata) > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:750) > at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) > at > kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:750) > at > kafka.coordinator.group.GroupCoordinator.handleTxnCompletion(GroupCoordinator.scala:439) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$maybeSendResponseCallback$1(KafkaApis.scala:1556) > at > kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$20.apply(KafkaApis.scala:1614) > at > kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$20.apply(KafkaApis.scala:1614) > at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:134) > at > kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:66) > at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:116) > at > kafka.server.DelayedProduce.safeTryComplete(DelayedProduce.scala:76) > locked <0x000000074b21c968> (a kafka.server.DelayedProduce) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:338) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:244) > at > kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:284) > at > kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:434) > at > kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:285) > at > kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:1290) > at > kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:1286) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:1286) > at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:786) > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:598) > at kafka.server.KafkaApis.handle(KafkaApis.scala:100) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65) > at java.lang.Thread.run(Thread.java:748) > "kafka-request-handler-4": > at > kafka.server.DelayedProduce.safeTryComplete(DelayedProduce.scala:75) > waiting to lock <0x0000000749be7bb8> (a kafka.server.DelayedProduce) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:338) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:244) > at > kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:284) > at > kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:434) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:516) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:707) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:691) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:691) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:441) > at > kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:240) > at > kafka.coordinator.group.GroupMetadataManager.storeGroup(GroupMetadataManager.scala:228) > at > kafka.coordinator.group.GroupCoordinator.onCompleteJoin(GroupCoordinator.scala:731) > locked <0x000000074a9fbc50> (a > kafka.coordinator.group.GroupMetadata) > at > kafka.coordinator.group.DelayedJoin.onComplete(DelayedJoin.scala:44) > at > kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:66) > at > kafka.coordinator.group.DelayedJoin$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedJoin.scala:42) > at > kafka.coordinator.group.GroupCoordinator.tryCompleteJoin(GroupCoordinator.scala:708) > locked <0x000000074a9fbc50> (a > kafka.coordinator.group.GroupMetadata) > at > kafka.coordinator.group.DelayedJoin.tryComplete(DelayedJoin.scala:42) > at > kafka.coordinator.group.DelayedJoin.safeTryComplete(DelayedJoin.scala:40) > at > kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:199) > at > kafka.coordinator.group.GroupCoordinator.prepareRebalance(GroupCoordinator.scala:693) > at > kafka.coordinator.group.GroupCoordinator.kafka$coordinator$group$GroupCoordinator$$maybePrepareRebalance(GroupCoordinator.scala:668) > locked <0x000000074a9fbc50> (a > kafka.coordinator.group.GroupMetadata) > at > kafka.coordinator.group.GroupCoordinator.removeMemberAndUpdateGroup(GroupCoordinator.scala:700) > at > kafka.coordinator.group.GroupCoordinator.handleLeaveGroup(GroupCoordinator.scala:324) > locked <0x000000074a9fbc50> (a > kafka.coordinator.group.GroupMetadata) > at > kafka.server.KafkaApis.handleLeaveGroupRequest(KafkaApis.scala:1259) > at kafka.server.KafkaApis.handle(KafkaApis.scala:112) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65) > at java.lang.Thread.run(Thread.java:748) > "kafka-request-handler-3": > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:752) > waiting to lock <0x000000074a9fbc50> (a > kafka.coordinator.group.GroupMetadata) > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:750) > at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) > at > kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:750) > at > kafka.coordinator.group.GroupCoordinator.handleTxnCompletion(GroupCoordinator.scala:439) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$maybeSendResponseCallback$1(KafkaApis.scala:1556) > at > kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$20.apply(KafkaApis.scala:1614) > at > kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$20.apply(KafkaApis.scala:1614) > at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:134) > at > kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:66) > at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:116) > at > kafka.server.DelayedProduce.safeTryComplete(DelayedProduce.scala:76) > locked <0x0000000749be7bb8> (a kafka.server.DelayedProduce) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:338) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:244) > at > kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:284) > at > kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:1294) > at > kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:1286) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:1286) > at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:786) > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:598) > at kafka.server.KafkaApis.handle(KafkaApis.scala:100) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65) > at java.lang.Thread.run(Thread.java:748) > Found 1 deadlock. > {quote} -- This message was sent by Atlassian JIRA (v6.4.14#64029)