[ https://issues.apache.org/jira/browse/KAFKA-6653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16400019#comment-16400019 ]
ASF GitHub Bot commented on KAFKA-6653: --------------------------------------- hachikuji closed pull request #4704: KAFKA-6653: Complete delayed operations even when there is lock contention URL: https://github.com/apache/kafka/pull/4704 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 894d30e2d7c..2a096e1a811 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -47,6 +47,7 @@ abstract class DelayedOperation(override val delayMs: Long, lockOpt: Option[Lock] = None) extends TimerTask with Logging { private val completed = new AtomicBoolean(false) + private val tryCompletePending = new AtomicBoolean(false) // Visible for testing private[server] val lock: Lock = lockOpt.getOrElse(new ReentrantLock) @@ -101,16 +102,38 @@ abstract class DelayedOperation(override val delayMs: Long, /** * Thread-safe variant of tryComplete() that attempts completion only if the lock can be acquired * without blocking. + * + * If threadA acquires the lock and performs the check for completion before completion criteria is met + * and threadB satisfies the completion criteria, but fails to acquire the lock because threadA has not + * yet released the lock, we need to ensure that completion is attempted again without blocking threadA + * or threadB. `tryCompletePending` is set by threadB when it fails to acquire the lock and at least one + * of threadA or threadB will attempt completion of the operation if this flag is set. This ensures that + * every invocation of `maybeTryComplete` is followed by at least one invocation of `tryComplete` until + * the operation is actually completed. */ private[server] def maybeTryComplete(): Boolean = { - if (lock.tryLock()) { - try { - tryComplete() - } finally { - lock.unlock() + var retry = false + var done = false + do { + if (lock.tryLock()) { + try { + tryCompletePending.set(false) + done = tryComplete() + } finally { + lock.unlock() + } + // While we were holding the lock, another thread may have invoked `maybeTryComplete` and set + // `tryCompletePending`. In this case we should retry. + retry = tryCompletePending.get() + } else { + // Another thread is holding the lock. If `tryCompletePending` is already set and this thread failed to + // acquire the lock, then the thread that is holding the lock is guaranteed to see the flag and retry. + // Otherwise, we should set the flag and retry on this thread since the thread holding the lock may have + // released the lock and returned by the time the flag is set. + retry = !tryCompletePending.getAndSet(true) } - } else - false + } while (!isCompleted && retry) + done } /* diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index d4d79e554c7..3b077a0b438 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -17,11 +17,13 @@ package kafka.server -import java.util.concurrent.{Executors, Future} +import java.util.Random +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.locks.ReentrantLock import kafka.utils.CoreUtils.inLock - +import kafka.utils.TestUtils import org.apache.kafka.common.utils.Time import org.junit.{After, Before, Test} import org.junit.Assert._ @@ -29,6 +31,7 @@ import org.junit.Assert._ class DelayedOperationTest { var purgatory: DelayedOperationPurgatory[MockDelayedOperation] = null + var executorService: ExecutorService = null @Before def setUp() { @@ -38,6 +41,8 @@ class DelayedOperationTest { @After def tearDown() { purgatory.shutdown() + if (executorService != null) + executorService.shutdown() } @Test @@ -122,6 +127,94 @@ class DelayedOperationTest { assertEquals(Nil, cancelledOperations) } + /** + * Verify that if there is lock contention between two threads attempting to complete, + * completion is performed without any blocking in either thread. + */ + @Test + def testTryCompleteLockContention(): Unit = { + executorService = Executors.newSingleThreadExecutor() + val completionAttemptsRemaining = new AtomicInteger(Int.MaxValue) + val tryCompleteSemaphore = new Semaphore(1) + val key = "key" + + val op = new MockDelayedOperation(100000L, None, None) { + override def tryComplete() = { + val shouldComplete = completionAttemptsRemaining.decrementAndGet <= 0 + tryCompleteSemaphore.acquire() + try { + if (shouldComplete) + forceComplete() + else + false + } finally { + tryCompleteSemaphore.release() + } + } + } + + purgatory.tryCompleteElseWatch(op, Seq(key)) + completionAttemptsRemaining.set(2) + tryCompleteSemaphore.acquire() + val future = runOnAnotherThread(purgatory.checkAndComplete(key), shouldComplete = false) + TestUtils.waitUntilTrue(() => tryCompleteSemaphore.hasQueuedThreads, "Not attempting to complete") + purgatory.checkAndComplete(key) // this should not block even though lock is not free + assertFalse("Operation should not have completed", op.isCompleted) + tryCompleteSemaphore.release() + future.get(10, TimeUnit.SECONDS) + assertTrue("Operation should have completed", op.isCompleted) + } + + /** + * Test `tryComplete` with multiple threads to verify that there are no timing windows + * when completion is not performed even if the thread that makes the operation completable + * may not be able to acquire the operation lock. Since it is difficult to test all scenarios, + * this test uses random delays with a large number of threads. + */ + @Test + def testTryCompleteWithMultipleThreads(): Unit = { + val executor = Executors.newScheduledThreadPool(20) + this.executorService = executor + val random = new Random + val maxDelayMs = 10 + val completionAttempts = 20 + + class TestDelayOperation(index: Int) extends MockDelayedOperation(10000L) { + val key = s"key$index" + val completionAttemptsRemaining = new AtomicInteger(completionAttempts) + + override def tryComplete(): Boolean = { + val shouldComplete = completable + Thread.sleep(random.nextInt(maxDelayMs)) + if (shouldComplete) + forceComplete() + else + false + } + } + val ops = (0 until 100).map { index => + val op = new TestDelayOperation(index) + purgatory.tryCompleteElseWatch(op, Seq(op.key)) + op + } + + def scheduleTryComplete(op: TestDelayOperation, delayMs: Long): Future[_] = { + executor.schedule(new Runnable { + override def run(): Unit = { + if (op.completionAttemptsRemaining.decrementAndGet() == 0) + op.completable = true + purgatory.checkAndComplete(op.key) + } + }, delayMs, TimeUnit.MILLISECONDS) + } + + (1 to completionAttempts).flatMap { _ => + ops.map { op => scheduleTryComplete(op, random.nextInt(maxDelayMs)) } + }.foreach { future => future.get } + + ops.foreach { op => assertTrue("Operation should have completed", op.isCompleted) } + } + @Test def testDelayedOperationLock() { verifyDelayedOperationLock(new MockDelayedOperation(100000L), mismatchedLocks = false) @@ -141,102 +234,97 @@ class DelayedOperationTest { def verifyDelayedOperationLock(mockDelayedOperation: => MockDelayedOperation, mismatchedLocks: Boolean) { val key = "key" - val executorService = Executors.newSingleThreadExecutor - try { - def createDelayedOperations(count: Int): Seq[MockDelayedOperation] = { - (1 to count).map { _ => - val op = mockDelayedOperation - purgatory.tryCompleteElseWatch(op, Seq(key)) - assertFalse("Not completable", op.isCompleted) - op - } + executorService = Executors.newSingleThreadExecutor + def createDelayedOperations(count: Int): Seq[MockDelayedOperation] = { + (1 to count).map { _ => + val op = mockDelayedOperation + purgatory.tryCompleteElseWatch(op, Seq(key)) + assertFalse("Not completable", op.isCompleted) + op } + } - def createCompletableOperations(count: Int): Seq[MockDelayedOperation] = { - (1 to count).map { _ => - val op = mockDelayedOperation - op.completable = true - op - } + def createCompletableOperations(count: Int): Seq[MockDelayedOperation] = { + (1 to count).map { _ => + val op = mockDelayedOperation + op.completable = true + op } + } - def runOnAnotherThread(fun: => Unit, shouldComplete: Boolean): Future[_] = { - val future = executorService.submit(new Runnable { - def run() = fun - }) - if (shouldComplete) - future.get() - else - assertFalse("Should not have completed", future.isDone) - future - } + def checkAndComplete(completableOps: Seq[MockDelayedOperation], expectedComplete: Seq[MockDelayedOperation]): Unit = { + completableOps.foreach(op => op.completable = true) + val completed = purgatory.checkAndComplete(key) + assertEquals(expectedComplete.size, completed) + expectedComplete.foreach(op => assertTrue("Should have completed", op.isCompleted)) + val expectedNotComplete = completableOps.toSet -- expectedComplete + expectedNotComplete.foreach(op => assertFalse("Should not have completed", op.isCompleted)) + } - def checkAndComplete(completableOps: Seq[MockDelayedOperation], expectedComplete: Seq[MockDelayedOperation]): Unit = { - completableOps.foreach(op => op.completable = true) - val completed = purgatory.checkAndComplete(key) - assertEquals(expectedComplete.size, completed) - expectedComplete.foreach(op => assertTrue("Should have completed", op.isCompleted)) - val expectedNotComplete = completableOps.toSet -- expectedComplete - expectedNotComplete.foreach(op => assertFalse("Should not have completed", op.isCompleted)) - } + // If locks are free all completable operations should complete + var ops = createDelayedOperations(2) + checkAndComplete(ops, ops) - // If locks are free all completable operations should complete - var ops = createDelayedOperations(2) + // Lock held by current thread, completable operations should complete + ops = createDelayedOperations(2) + inLock(ops(1).lock) { checkAndComplete(ops, ops) + } - // Lock held by current thread, completable operations should complete - ops = createDelayedOperations(2) - inLock(ops(1).lock) { - checkAndComplete(ops, ops) - } + // Lock held by another thread, should not block, only operations that can be + // locked without blocking on the current thread should complete + ops = createDelayedOperations(2) + runOnAnotherThread(ops(0).lock.lock(), true) + try { + checkAndComplete(ops, Seq(ops(1))) + } finally { + runOnAnotherThread(ops(0).lock.unlock(), true) + checkAndComplete(Seq(ops(0)), Seq(ops(0))) + } - // Lock held by another thread, should not block, only operations that can be - // locked without blocking on the current thread should complete - ops = createDelayedOperations(2) - runOnAnotherThread(ops(0).lock.lock(), true) + // Lock acquired by response callback held by another thread, should not block + // if the response lock is used as operation lock, only operations + // that can be locked without blocking on the current thread should complete + ops = createDelayedOperations(2) + ops(0).responseLockOpt.foreach { lock => + runOnAnotherThread(lock.lock(), true) try { - checkAndComplete(ops, Seq(ops(1))) - } finally { - runOnAnotherThread(ops(0).lock.unlock(), true) - checkAndComplete(Seq(ops(0)), Seq(ops(0))) - } - - // Lock acquired by response callback held by another thread, should not block - // if the response lock is used as operation lock, only operations - // that can be locked without blocking on the current thread should complete - ops = createDelayedOperations(2) - ops(0).responseLockOpt.foreach { lock => - runOnAnotherThread(lock.lock(), true) try { - try { - checkAndComplete(ops, Seq(ops(1))) - assertFalse("Should have failed with mismatched locks", mismatchedLocks) - } catch { - case e: IllegalStateException => - assertTrue("Should not have failed with valid locks", mismatchedLocks) - } - } finally { - runOnAnotherThread(lock.unlock(), true) - checkAndComplete(Seq(ops(0)), Seq(ops(0))) + checkAndComplete(ops, Seq(ops(1))) + assertFalse("Should have failed with mismatched locks", mismatchedLocks) + } catch { + case e: IllegalStateException => + assertTrue("Should not have failed with valid locks", mismatchedLocks) } + } finally { + runOnAnotherThread(lock.unlock(), true) + checkAndComplete(Seq(ops(0)), Seq(ops(0))) } + } - // Immediately completable operations should complete without locking - ops = createCompletableOperations(2) - ops.foreach { op => - assertTrue("Should have completed", purgatory.tryCompleteElseWatch(op, Seq(key))) - assertTrue("Should have completed", op.isCompleted) - } - - } finally { - executorService.shutdown() + // Immediately completable operations should complete without locking + ops = createCompletableOperations(2) + ops.foreach { op => + assertTrue("Should have completed", purgatory.tryCompleteElseWatch(op, Seq(key))) + assertTrue("Should have completed", op.isCompleted) } } + private def runOnAnotherThread(fun: => Unit, shouldComplete: Boolean): Future[_] = { + val future = executorService.submit(new Runnable { + def run() = fun + }) + if (shouldComplete) + future.get() + else + assertFalse("Should not have completed", future.isDone) + future + } class MockDelayedOperation(delayMs: Long, - lockOpt: Option[ReentrantLock] = None, - val responseLockOpt: Option[ReentrantLock] = None) extends DelayedOperation(delayMs, lockOpt) { + lockOpt: Option[ReentrantLock] = None, + val responseLockOpt: Option[ReentrantLock] = None) + extends DelayedOperation(delayMs, lockOpt) { var completable = false def awaitExpiration() { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Delayed operations may not be completed when there is lock contention > --------------------------------------------------------------------- > > Key: KAFKA-6653 > URL: https://issues.apache.org/jira/browse/KAFKA-6653 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 0.11.0.2, 1.1.0, 1.0.1 > Reporter: Rajini Sivaram > Assignee: Rajini Sivaram > Priority: Major > > If there is lock contention while multiple threads check if a delayed > operation may be completed (e.g. a produce request with acks=-1), only the > thread that acquires the lock without blocking attempts to complete the > operation. This change was made to avoid deadlocks under KAFKA-5970. But this > leaves a timing window when an operation becomes ready to complete after > another thread has acquired the lock and performed the check for completion, > but not yet released the lock. In this case, the operation may never be > completed and will timeout unless there are other operations with the same > key. The timeout was observed in a failed system test where a produce request > timed out, causing the test failure. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)