[ 
https://issues.apache.org/jira/browse/KAFKA-6917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481159#comment-16481159
 ] 

ASF GitHub Bot commented on KAFKA-6917:
---------------------------------------

hachikuji closed pull request #5036: KAFKA-6917: Process txn completion 
asynchronously to avoid deadlock
URL: https://github.com/apache/kafka/pull/5036
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index cbbd91396b2..9748e174c78 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -81,8 +81,7 @@ class GroupCoordinator(val brokerId: Int,
    */
   def startup(enableMetadataExpiration: Boolean = true) {
     info("Starting up.")
-    if (enableMetadataExpiration)
-      groupManager.enableMetadataExpiration()
+    groupManager.startup(enableMetadataExpiration)
     isActive.set(true)
     info("Startup complete.")
   }
@@ -485,12 +484,12 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
-  def handleTxnCompletion(producerId: Long,
-                          offsetsPartitions: Iterable[TopicPartition],
-                          transactionResult: TransactionResult) {
+  def scheduleHandleTxnCompletion(producerId: Long,
+                                  offsetsPartitions: Iterable[TopicPartition],
+                                  transactionResult: TransactionResult) {
     require(offsetsPartitions.forall(_.topic == 
Topic.GROUP_METADATA_TOPIC_NAME))
     val isCommit = transactionResult == TransactionResult.COMMIT
-    groupManager.handleTxnCompletion(producerId, 
offsetsPartitions.map(_.partition).toSet, isCommit)
+    groupManager.scheduleHandleTxnCompletion(producerId, 
offsetsPartitions.map(_.partition).toSet, isCommit)
   }
 
   private def doCommitOffsets(group: GroupMetadata,
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 81ce8d512e5..c31735b75c8 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -135,13 +135,14 @@ class GroupMetadataManager(brokerId: Int,
       })
     })
 
-  def enableMetadataExpiration() {
+  def startup(enableMetadataExpiration: Boolean) {
     scheduler.startup()
-
-    scheduler.schedule(name = "delete-expired-group-metadata",
-      fun = cleanupGroupMetadata,
-      period = config.offsetsRetentionCheckIntervalMs,
-      unit = TimeUnit.MILLISECONDS)
+    if (enableMetadataExpiration) {
+      scheduler.schedule(name = "delete-expired-group-metadata",
+        fun = cleanupGroupMetadata,
+        period = config.offsetsRetentionCheckIntervalMs,
+        unit = TimeUnit.MILLISECONDS)
+    }
   }
 
   def currentGroups: Iterable[GroupMetadata] = groupMetadataCache.values
@@ -793,7 +794,20 @@ class GroupMetadataManager(brokerId: Int,
     offsetsRemoved
   }
 
-  def handleTxnCompletion(producerId: Long, completedPartitions: Set[Int], 
isCommit: Boolean) {
+  /**
+   * Complete pending transactional offset commits of the groups of 
`producerId` from the provided
+   * `completedPartitions`. This method is invoked when a commit or abort 
marker is fully written
+   * to the log. It may be invoked when a group lock is held by the caller, 
for instance when delayed
+   * operations are completed while appending offsets for a group. Since we 
need to acquire one or
+   * more group metadata locks to handle transaction completion, this 
operation is scheduled on
+   * the scheduler thread to avoid deadlocks.
+   */
+  def scheduleHandleTxnCompletion(producerId: Long, completedPartitions: 
Set[Int], isCommit: Boolean): Unit = {
+    scheduler.schedule(s"handleTxnCompletion-$producerId", () =>
+      handleTxnCompletion(producerId, completedPartitions, isCommit))
+  }
+
+  private[group] def handleTxnCompletion(producerId: Long, 
completedPartitions: Set[Int], isCommit: Boolean): Unit = {
     val pendingGroups = groupsBelongingToPartitions(producerId, 
completedPartitions)
     pendingGroups.foreach { case (groupId) =>
       getGroup(groupId) match {
@@ -802,7 +816,7 @@ class GroupMetadataManager(brokerId: Int,
             group.completePendingTxnOffsetCommit(producerId, isCommit)
             removeProducerGroup(producerId, groupId)
           }
-       }
+        }
         case _ =>
           info(s"Group $groupId has moved away from $brokerId after 
transaction marker was written but before the " +
             s"cache was updated. The cache on the new group owner will be 
updated instead.")
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 828b08bfe1d..5532bf1e9a8 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1630,7 +1630,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         // as soon as the end transaction marker has been written for a 
transactional offset commit,
         // call to the group coordinator to materialize the offsets into the 
cache
         try {
-          groupCoordinator.handleTxnCompletion(producerId, 
successfulOffsetsPartitions, result)
+          groupCoordinator.scheduleHandleTxnCompletion(producerId, 
successfulOffsetsPartitions, result)
         } catch {
           case e: Exception =>
             error(s"Received an exception while trying to update the offsets 
cache on transaction marker append", e)
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
index 44e13560b00..befd22ac69e 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
@@ -27,7 +27,7 @@ import kafka.server.{ DelayedOperationPurgatory, KafkaConfig }
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{ JoinGroupRequest, TransactionResult }
+import org.apache.kafka.common.requests.JoinGroupRequest
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{ After, Before, Test }
@@ -117,7 +117,6 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
     verifyConcurrentRandomSequences(createGroupMembers, allOperationsWithTxn)
   }
 
-
   abstract class GroupOperation[R, C] extends Operation {
     val responseFutures = new ConcurrentHashMap[GroupMember, Future[R]]()
 
@@ -228,8 +227,17 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
       val offsets = immutable.Map(tp -> OffsetAndMetadata(1))
       val producerId = 1000L
       val producerEpoch : Short = 2
+      // When transaction offsets are appended to the log, transactions may be 
scheduled for
+      // completion. Since group metadata locks are acquired for transaction 
completion, include
+      // this in the callback to test that there are no deadlocks.
+      def callbackWithTxnCompletion(errors: Map[TopicPartition, Errors]): Unit 
= {
+        val offsetsPartitions = (0 to numPartitions).map(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, _))
+        groupCoordinator.groupManager.scheduleHandleTxnCompletion(producerId,
+          offsetsPartitions.map(_.partition).toSet, isCommit = 
random.nextBoolean)
+        responseCallback(errors)
+      }
       groupCoordinator.handleTxnCommitOffsets(member.group.groupId,
-          producerId, producerEpoch, offsets, responseCallback)
+          producerId, producerEpoch, offsets, callbackWithTxnCompletion)
     }
   }
 
@@ -241,19 +249,14 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
     override def runWithCallback(member: GroupMember, responseCallback: 
CompleteTxnCallback): Unit = {
       val producerId = 1000L
       val offsetsPartitions = (0 to numPartitions).map(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, _))
-      groupCoordinator.handleTxnCompletion(producerId, offsetsPartitions, 
transactionResult(member.group.groupId))
+      groupCoordinator.groupManager.handleTxnCompletion(producerId,
+        offsetsPartitions.map(_.partition).toSet, isCommit = 
random.nextBoolean)
       responseCallback(Errors.NONE)
     }
     override def awaitAndVerify(member: GroupMember): Unit = {
       val error = await(member, 500)
       assertEquals(Errors.NONE, error)
     }
-    // Test both commit and abort. Group ids used in the test have the format 
<prefix><index>
-    // Use the last digit of the index to decide between commit and abort.
-    private def transactionResult(groupId: String): TransactionResult = {
-      val lastDigit = groupId(groupId.length - 1).toInt
-      if (lastDigit % 2 == 0) TransactionResult.COMMIT else 
TransactionResult.ABORT
-    }
   }
 
   class LeaveGroupOperation extends GroupOperation[LeaveGroupCallbackParams, 
LeaveGroupCallback] {
@@ -273,7 +276,6 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
 
 object GroupCoordinatorConcurrencyTest {
 
-
   type JoinGroupCallback = JoinGroupResult => Unit
   type SyncGroupCallbackParams = (Array[Byte], Errors)
   type SyncGroupCallback = (Array[Byte], Errors) => Unit
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 933e91bfcec..608d7cc997f 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -935,7 +935,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupPartitionId)
 
     // Send commit marker.
-    groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.COMMIT)
+    handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.COMMIT)
 
     // Validate that committed offset is materialized.
     val (secondReqError, secondReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
@@ -960,7 +960,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupPartitionId)
 
     // Validate that the pending commit is discarded.
-    groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.ABORT)
+    handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.ABORT)
 
     val (secondReqError, secondReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
     assertEquals(Errors.NONE, secondReqError)
@@ -982,14 +982,14 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData.get(tp).map(_.offset))
 
     val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupPartitionId)
-    groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.ABORT)
+    handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.ABORT)
 
     val (secondReqError, secondReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
     assertEquals(Errors.NONE, secondReqError)
     assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
secondReqPartitionData.get(tp).map(_.offset))
 
     // Ignore spurious commit.
-    groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.COMMIT)
+    handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.COMMIT)
 
     val (thirdReqError, thirdReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
     assertEquals(Errors.NONE, thirdReqError)
@@ -1026,7 +1026,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1)))
 
     // We got a commit for only one __consumer_offsets partition. We should 
only materialize it's group offsets.
-    groupCoordinator.handleTxnCompletion(producerId, 
List(offsetTopicPartitions(0)), TransactionResult.COMMIT)
+    handleTxnCompletion(producerId, List(offsetTopicPartitions(0)), 
TransactionResult.COMMIT)
     groupCoordinator.handleFetchOffsets(groupIds(0), Some(partitions)) match {
       case (error, partData) =>
         errors.append(error)
@@ -1052,7 +1052,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData(1).get(partitions(1)).map(_.offset))
 
     // Now we receive the other marker.
-    groupCoordinator.handleTxnCompletion(producerId, 
List(offsetTopicPartitions(1)), TransactionResult.COMMIT)
+    handleTxnCompletion(producerId, List(offsetTopicPartitions(1)), 
TransactionResult.COMMIT)
     errors.clear()
     partitionData.clear()
     groupCoordinator.handleFetchOffsets(groupIds(0), Some(partitions)) match {
@@ -1102,7 +1102,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1)))
 
     // producer0 commits its transaction.
-    groupCoordinator.handleTxnCompletion(producerIds(0), 
List(offsetTopicPartition), TransactionResult.COMMIT)
+    handleTxnCompletion(producerIds(0), List(offsetTopicPartition), 
TransactionResult.COMMIT)
     groupCoordinator.handleFetchOffsets(groupId, Some(partitions)) match {
       case (error, partData) =>
         errors.append(error)
@@ -1117,7 +1117,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData(0).get(partitions(1)).map(_.offset))
 
     // producer1 now commits its transaction.
-    groupCoordinator.handleTxnCompletion(producerIds(1), 
List(offsetTopicPartition), TransactionResult.COMMIT)
+    handleTxnCompletion(producerIds(1), List(offsetTopicPartition), 
TransactionResult.COMMIT)
 
     groupCoordinator.handleFetchOffsets(groupId, Some(partitions)) match {
       case (error, partData) =>
@@ -1723,4 +1723,11 @@ class GroupCoordinatorTest extends JUnitSuite {
     Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
   }
 
+  def handleTxnCompletion(producerId: Long,
+                          offsetsPartitions: Iterable[TopicPartition],
+                          transactionResult: TransactionResult): Unit = {
+    val isCommit = transactionResult == TransactionResult.COMMIT
+    groupCoordinator.groupManager.handleTxnCompletion(producerId, 
offsetsPartitions.map(_.partition).toSet, isCommit)
+  }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Request handler deadlocks attempting to acquire group metadata lock
> -------------------------------------------------------------------
>
>                 Key: KAFKA-6917
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6917
>             Project: Kafka
>          Issue Type: Task
>          Components: core
>    Affects Versions: 1.1.0, 1.0.1
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Blocker
>             Fix For: 2.0.0, 1.0.2, 1.1.1
>
>
> We have noticed another deadlock with the group metadata lock with version 
> 1.1.
> {quote}
> Found one Java-level deadlock:
> =============================
> "executor-Heartbeat":
>   waiting for ownable synchronizer 0x00000005ce477080, (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
>   which is held by "kafka-request-handler-3"
> "kafka-request-handler-3":
>   waiting for ownable synchronizer 0x00000005cbe7f698, (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
>   which is held by "kafka-request-handler-4"
> "kafka-request-handler-4":
>   waiting for ownable synchronizer 0x00000005ce477080, (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
>   which is held by "kafka-request-handler-3"
> Java stack information for the threads listed above:
> ===================================================
> "executor-Heartbeat":
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000005ce477080> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
>         at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
>         at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
>         at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
>         at 
> kafka.coordinator.group.GroupCoordinator.onExpireHeartbeat(GroupCoordinator.scala:833)
>         at 
> kafka.coordinator.group.DelayedHeartbeat.onExpiration(DelayedHeartbeat.scala:34)
>         at kafka.server.DelayedOperation.run(DelayedOperation.scala:144)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> "kafka-request-handler-3":
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000005cbe7f698> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
>         at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
>         at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
>         at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
>         at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:801)
>         at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:799)
>         at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>         at 
> kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:799)
>         at 
> kafka.coordinator.group.GroupCoordinator.handleTxnCompletion(GroupCoordinator.scala:496)
>         at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$maybeSendResponseCallback$1(KafkaApis.scala:1633)
>         at 
> kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$19.apply(KafkaApis.scala:1691)
>         at 
> kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$19.apply(KafkaApis.scala:1691)
>         at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
>         at 
> kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>         at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:111)
>         at 
> kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121)
>         at 
> kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:371)
>         at 
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:277)
>         at 
> kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:294)
>         at 
> kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:498)
>         at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:592)
>         at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:744)
>         at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:728)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
>         at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>         at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:728)
>         at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:469)
>         at 
> kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:285)
>         at 
> kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:428)
>         at 
> kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply$mcV$sp(GroupCoordinator.scala:512)
>         at 
> kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:507)
>         at 
> kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:507)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>         at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
>         at 
> kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:506)
>         at 
> kafka.coordinator.group.GroupCoordinator.handleTxnCommitOffsets(GroupCoordinator.scala:460)
>         at 
> kafka.server.KafkaApis.handleTxnOffsetCommitRequest(KafkaApis.scala:1833)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:139)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>         at java.lang.Thread.run(Thread.java:748)
> "kafka-request-handler-4":
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000005ce477080> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
>         at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
>         at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
>         at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
>         at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:801)
>         at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:799)
>         at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>         at 
> kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:799)
>         at 
> kafka.coordinator.group.GroupCoordinator.handleTxnCompletion(GroupCoordinator.scala:496)
>         at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$maybeSendResponseCallback$1(KafkaApis.scala:1633)
>         at 
> kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$19.apply(KafkaApis.scala:1691)
>         at 
> kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$19.apply(KafkaApis.scala:1691)
>         at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
>         at 
> kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>         at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:111)
>         at 
> kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121)
>         at 
> kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:371)
>         at 
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:277)
>         at 
> kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:294)
>         at 
> kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:498)
>         at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:592)
>         at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:744)
>         at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:728)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
>         at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>         at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:728)
>         at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:469)
>         at 
> kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:285)
>         at 
> kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:428)
>         at 
> kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply$mcV$sp(GroupCoordinator.scala:512)
>         at 
> kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:507)
>         at 
> kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:507)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>         at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
>         at 
> kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:506)
>         at 
> kafka.coordinator.group.GroupCoordinator.handleTxnCommitOffsets(GroupCoordinator.scala:460)
>         at 
> kafka.server.KafkaApis.handleTxnOffsetCommitRequest(KafkaApis.scala:1833)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:139)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>         at java.lang.Thread.run(Thread.java:748)
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to