This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new aa7a3dbd303 KAFKA-18027: MINOR: Correct DelayedOperationPurgatory code 
around adding of an already completed operation (#17842)
aa7a3dbd303 is described below

commit aa7a3dbd3037bbf0de703361c4655335ce6402c4
Author: Abhinav Dixit <[email protected]>
AuthorDate: Wed Nov 20 22:56:44 2024 +0530

    KAFKA-18027: MINOR: Correct DelayedOperationPurgatory code around adding of 
an already completed operation (#17842)
    
    Reviewers: Apoorv Mittal <[email protected]>, Jun Rao 
<[email protected]>
---
 .../org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java
 
b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java
index 3491aee139e..380f22c9c8e 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java
@@ -147,7 +147,10 @@ public class DelayedOperationPurgatory<T extends 
DelayedOperation> {
         // any exclusive lock. Since 
DelayedOperationPurgatory.checkAndComplete() completes delayed operations 
asynchronously,
         // holding an exclusive lock to make the call is often unnecessary.
         if (operation.safeTryCompleteOrElse(() -> {
-            watchKeys.forEach(key -> watchForOperation(key, operation));
+            watchKeys.forEach(key -> {
+                if (!operation.isCompleted())
+                    watchForOperation(key, operation);
+            });
             if (!watchKeys.isEmpty())
                 estimatedTotalOperations.incrementAndGet();
         })) {

Reply via email to