[ 
https://issues.apache.org/jira/browse/KAFKA-6653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16400019#comment-16400019
 ] 

ASF GitHub Bot commented on KAFKA-6653:
---------------------------------------

hachikuji closed pull request #4704: KAFKA-6653: Complete delayed operations 
even when there is lock contention
URL: https://github.com/apache/kafka/pull/4704
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala 
b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 894d30e2d7c..2a096e1a811 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -47,6 +47,7 @@ abstract class DelayedOperation(override val delayMs: Long,
     lockOpt: Option[Lock] = None) extends TimerTask with Logging {
 
   private val completed = new AtomicBoolean(false)
+  private val tryCompletePending = new AtomicBoolean(false)
   // Visible for testing
   private[server] val lock: Lock = lockOpt.getOrElse(new ReentrantLock)
 
@@ -101,16 +102,38 @@ abstract class DelayedOperation(override val delayMs: 
Long,
   /**
    * Thread-safe variant of tryComplete() that attempts completion only if the 
lock can be acquired
    * without blocking.
+   *
+   * If threadA acquires the lock and performs the check for completion before 
completion criteria is met
+   * and threadB satisfies the completion criteria, but fails to acquire the 
lock because threadA has not
+   * yet released the lock, we need to ensure that completion is attempted 
again without blocking threadA
+   * or threadB. `tryCompletePending` is set by threadB when it fails to 
acquire the lock and at least one
+   * of threadA or threadB will attempt completion of the operation if this 
flag is set. This ensures that
+   * every invocation of `maybeTryComplete` is followed by at least one 
invocation of `tryComplete` until
+   * the operation is actually completed.
    */
   private[server] def maybeTryComplete(): Boolean = {
-    if (lock.tryLock()) {
-      try {
-        tryComplete()
-      } finally {
-        lock.unlock()
+    var retry = false
+    var done = false
+    do {
+      if (lock.tryLock()) {
+        try {
+          tryCompletePending.set(false)
+          done = tryComplete()
+        } finally {
+          lock.unlock()
+        }
+        // While we were holding the lock, another thread may have invoked 
`maybeTryComplete` and set
+        // `tryCompletePending`. In this case we should retry.
+        retry = tryCompletePending.get()
+      } else {
+        // Another thread is holding the lock. If `tryCompletePending` is 
already set and this thread failed to
+        // acquire the lock, then the thread that is holding the lock is 
guaranteed to see the flag and retry.
+        // Otherwise, we should set the flag and retry on this thread since 
the thread holding the lock may have
+        // released the lock and returned by the time the flag is set.
+        retry = !tryCompletePending.getAndSet(true)
       }
-    } else
-      false
+    } while (!isCompleted && retry)
+    done
   }
 
   /*
diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 
b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
index d4d79e554c7..3b077a0b438 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -17,11 +17,13 @@
 
 package kafka.server
 
-import java.util.concurrent.{Executors, Future}
+import java.util.Random
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.locks.ReentrantLock
 
 import kafka.utils.CoreUtils.inLock
-
+import kafka.utils.TestUtils
 import org.apache.kafka.common.utils.Time
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
@@ -29,6 +31,7 @@ import org.junit.Assert._
 class DelayedOperationTest {
 
   var purgatory: DelayedOperationPurgatory[MockDelayedOperation] = null
+  var executorService: ExecutorService = null
 
   @Before
   def setUp() {
@@ -38,6 +41,8 @@ class DelayedOperationTest {
   @After
   def tearDown() {
     purgatory.shutdown()
+    if (executorService != null)
+      executorService.shutdown()
   }
 
   @Test
@@ -122,6 +127,94 @@ class DelayedOperationTest {
     assertEquals(Nil, cancelledOperations)
   }
 
+  /**
+    * Verify that if there is lock contention between two threads attempting 
to complete,
+    * completion is performed without any blocking in either thread.
+    */
+  @Test
+  def testTryCompleteLockContention(): Unit = {
+    executorService = Executors.newSingleThreadExecutor()
+    val completionAttemptsRemaining = new AtomicInteger(Int.MaxValue)
+    val tryCompleteSemaphore = new Semaphore(1)
+    val key = "key"
+
+    val op = new MockDelayedOperation(100000L, None, None) {
+      override def tryComplete() = {
+        val shouldComplete = completionAttemptsRemaining.decrementAndGet <= 0
+        tryCompleteSemaphore.acquire()
+        try {
+          if (shouldComplete)
+            forceComplete()
+          else
+            false
+        } finally {
+          tryCompleteSemaphore.release()
+        }
+      }
+    }
+
+    purgatory.tryCompleteElseWatch(op, Seq(key))
+    completionAttemptsRemaining.set(2)
+    tryCompleteSemaphore.acquire()
+    val future = runOnAnotherThread(purgatory.checkAndComplete(key), 
shouldComplete = false)
+    TestUtils.waitUntilTrue(() => tryCompleteSemaphore.hasQueuedThreads, "Not 
attempting to complete")
+    purgatory.checkAndComplete(key) // this should not block even though lock 
is not free
+    assertFalse("Operation should not have completed", op.isCompleted)
+    tryCompleteSemaphore.release()
+    future.get(10, TimeUnit.SECONDS)
+    assertTrue("Operation should have completed", op.isCompleted)
+  }
+
+  /**
+    * Test `tryComplete` with multiple threads to verify that there are no 
timing windows
+    * when completion is not performed even if the thread that makes the 
operation completable
+    * may not be able to acquire the operation lock. Since it is difficult to 
test all scenarios,
+    * this test uses random delays with a large number of threads.
+    */
+  @Test
+  def testTryCompleteWithMultipleThreads(): Unit = {
+    val executor = Executors.newScheduledThreadPool(20)
+    this.executorService = executor
+    val random = new Random
+    val maxDelayMs = 10
+    val completionAttempts = 20
+
+    class TestDelayOperation(index: Int) extends MockDelayedOperation(10000L) {
+      val key = s"key$index"
+      val completionAttemptsRemaining = new AtomicInteger(completionAttempts)
+
+      override def tryComplete(): Boolean = {
+        val shouldComplete = completable
+        Thread.sleep(random.nextInt(maxDelayMs))
+        if (shouldComplete)
+          forceComplete()
+        else
+          false
+      }
+    }
+    val ops = (0 until 100).map { index =>
+      val op = new TestDelayOperation(index)
+      purgatory.tryCompleteElseWatch(op, Seq(op.key))
+      op
+    }
+
+    def scheduleTryComplete(op: TestDelayOperation, delayMs: Long): Future[_] 
= {
+      executor.schedule(new Runnable {
+        override def run(): Unit = {
+          if (op.completionAttemptsRemaining.decrementAndGet() == 0)
+            op.completable = true
+          purgatory.checkAndComplete(op.key)
+        }
+      }, delayMs, TimeUnit.MILLISECONDS)
+    }
+
+    (1 to completionAttempts).flatMap { _ =>
+      ops.map { op => scheduleTryComplete(op, random.nextInt(maxDelayMs)) }
+    }.foreach { future => future.get }
+
+    ops.foreach { op => assertTrue("Operation should have completed", 
op.isCompleted) }
+  }
+
   @Test
   def testDelayedOperationLock() {
     verifyDelayedOperationLock(new MockDelayedOperation(100000L), 
mismatchedLocks = false)
@@ -141,102 +234,97 @@ class DelayedOperationTest {
 
   def verifyDelayedOperationLock(mockDelayedOperation: => 
MockDelayedOperation, mismatchedLocks: Boolean) {
     val key = "key"
-    val executorService = Executors.newSingleThreadExecutor
-    try {
-      def createDelayedOperations(count: Int): Seq[MockDelayedOperation] = {
-        (1 to count).map { _ =>
-          val op = mockDelayedOperation
-          purgatory.tryCompleteElseWatch(op, Seq(key))
-          assertFalse("Not completable", op.isCompleted)
-          op
-        }
+    executorService = Executors.newSingleThreadExecutor
+    def createDelayedOperations(count: Int): Seq[MockDelayedOperation] = {
+      (1 to count).map { _ =>
+        val op = mockDelayedOperation
+        purgatory.tryCompleteElseWatch(op, Seq(key))
+        assertFalse("Not completable", op.isCompleted)
+        op
       }
+    }
 
-      def createCompletableOperations(count: Int): Seq[MockDelayedOperation] = 
{
-        (1 to count).map { _ =>
-          val op = mockDelayedOperation
-          op.completable = true
-          op
-        }
+    def createCompletableOperations(count: Int): Seq[MockDelayedOperation] = {
+      (1 to count).map { _ =>
+        val op = mockDelayedOperation
+        op.completable = true
+        op
       }
+    }
 
-      def runOnAnotherThread(fun: => Unit, shouldComplete: Boolean): Future[_] 
= {
-        val future = executorService.submit(new Runnable {
-          def run() = fun
-        })
-        if (shouldComplete)
-          future.get()
-        else
-          assertFalse("Should not have completed", future.isDone)
-        future
-      }
+    def checkAndComplete(completableOps: Seq[MockDelayedOperation], 
expectedComplete: Seq[MockDelayedOperation]): Unit = {
+      completableOps.foreach(op => op.completable = true)
+      val completed = purgatory.checkAndComplete(key)
+      assertEquals(expectedComplete.size, completed)
+      expectedComplete.foreach(op => assertTrue("Should have completed", 
op.isCompleted))
+      val expectedNotComplete = completableOps.toSet -- expectedComplete
+      expectedNotComplete.foreach(op => assertFalse("Should not have 
completed", op.isCompleted))
+    }
 
-      def checkAndComplete(completableOps: Seq[MockDelayedOperation], 
expectedComplete: Seq[MockDelayedOperation]): Unit = {
-        completableOps.foreach(op => op.completable = true)
-        val completed = purgatory.checkAndComplete(key)
-        assertEquals(expectedComplete.size, completed)
-        expectedComplete.foreach(op => assertTrue("Should have completed", 
op.isCompleted))
-        val expectedNotComplete = completableOps.toSet -- expectedComplete
-        expectedNotComplete.foreach(op => assertFalse("Should not have 
completed", op.isCompleted))
-      }
+    // If locks are free all completable operations should complete
+    var ops = createDelayedOperations(2)
+    checkAndComplete(ops, ops)
 
-      // If locks are free all completable operations should complete
-      var ops = createDelayedOperations(2)
+    // Lock held by current thread, completable operations should complete
+    ops = createDelayedOperations(2)
+    inLock(ops(1).lock) {
       checkAndComplete(ops, ops)
+    }
 
-      // Lock held by current thread, completable operations should complete
-      ops = createDelayedOperations(2)
-      inLock(ops(1).lock) {
-        checkAndComplete(ops, ops)
-      }
+    // Lock held by another thread, should not block, only operations that can 
be
+    // locked without blocking on the current thread should complete
+    ops = createDelayedOperations(2)
+    runOnAnotherThread(ops(0).lock.lock(), true)
+    try {
+      checkAndComplete(ops, Seq(ops(1)))
+    } finally {
+      runOnAnotherThread(ops(0).lock.unlock(), true)
+      checkAndComplete(Seq(ops(0)), Seq(ops(0)))
+    }
 
-      // Lock held by another thread, should not block, only operations that 
can be
-      // locked without blocking on the current thread should complete
-      ops = createDelayedOperations(2)
-      runOnAnotherThread(ops(0).lock.lock(), true)
+    // Lock acquired by response callback held by another thread, should not 
block
+    // if the response lock is used as operation lock, only operations
+    // that can be locked without blocking on the current thread should 
complete
+    ops = createDelayedOperations(2)
+    ops(0).responseLockOpt.foreach { lock =>
+      runOnAnotherThread(lock.lock(), true)
       try {
-        checkAndComplete(ops, Seq(ops(1)))
-      } finally {
-        runOnAnotherThread(ops(0).lock.unlock(), true)
-        checkAndComplete(Seq(ops(0)), Seq(ops(0)))
-      }
-
-      // Lock acquired by response callback held by another thread, should not 
block
-      // if the response lock is used as operation lock, only operations
-      // that can be locked without blocking on the current thread should 
complete
-      ops = createDelayedOperations(2)
-      ops(0).responseLockOpt.foreach { lock =>
-        runOnAnotherThread(lock.lock(), true)
         try {
-          try {
-            checkAndComplete(ops, Seq(ops(1)))
-            assertFalse("Should have failed with mismatched locks", 
mismatchedLocks)
-          } catch {
-            case e: IllegalStateException =>
-              assertTrue("Should not have failed with valid locks", 
mismatchedLocks)
-          }
-        } finally {
-          runOnAnotherThread(lock.unlock(), true)
-          checkAndComplete(Seq(ops(0)), Seq(ops(0)))
+          checkAndComplete(ops, Seq(ops(1)))
+          assertFalse("Should have failed with mismatched locks", 
mismatchedLocks)
+        } catch {
+          case e: IllegalStateException =>
+            assertTrue("Should not have failed with valid locks", 
mismatchedLocks)
         }
+      } finally {
+        runOnAnotherThread(lock.unlock(), true)
+        checkAndComplete(Seq(ops(0)), Seq(ops(0)))
       }
+    }
 
-      // Immediately completable operations should complete without locking
-      ops = createCompletableOperations(2)
-      ops.foreach { op =>
-        assertTrue("Should have completed", purgatory.tryCompleteElseWatch(op, 
Seq(key)))
-        assertTrue("Should have completed", op.isCompleted)
-      }
-
-    } finally {
-      executorService.shutdown()
+    // Immediately completable operations should complete without locking
+    ops = createCompletableOperations(2)
+    ops.foreach { op =>
+      assertTrue("Should have completed", purgatory.tryCompleteElseWatch(op, 
Seq(key)))
+      assertTrue("Should have completed", op.isCompleted)
     }
   }
 
+  private def runOnAnotherThread(fun: => Unit, shouldComplete: Boolean): 
Future[_] = {
+    val future = executorService.submit(new Runnable {
+      def run() = fun
+    })
+    if (shouldComplete)
+      future.get()
+    else
+      assertFalse("Should not have completed", future.isDone)
+    future
+  }
 
   class MockDelayedOperation(delayMs: Long,
-      lockOpt: Option[ReentrantLock] = None,
-      val responseLockOpt: Option[ReentrantLock] = None) extends 
DelayedOperation(delayMs, lockOpt) {
+                             lockOpt: Option[ReentrantLock] = None,
+                             val responseLockOpt: Option[ReentrantLock] = None)
+                             extends DelayedOperation(delayMs, lockOpt) {
     var completable = false
 
     def awaitExpiration() {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Delayed operations may not be completed when there is lock contention
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-6653
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6653
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.11.0.2, 1.1.0, 1.0.1
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Major
>
> If there is lock contention while multiple threads check if a delayed 
> operation may be completed (e.g. a produce request with acks=-1), only the 
> thread that acquires the lock without blocking attempts to complete the 
> operation. This change was made to avoid deadlocks under KAFKA-5970. But this 
> leaves a timing window when an operation becomes ready to complete after 
> another thread has acquired the lock and performed the check for completion, 
> but not yet released the lock. In this case, the operation may never be 
> completed and will timeout unless there are other operations with the same 
> key. The timeout was observed in a failed system test where a produce request 
> timed out, causing the test failure.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to