This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new aa7a3dbd303 KAFKA-18027: MINOR: Correct DelayedOperationPurgatory code
around adding of an already completed operation (#17842)
aa7a3dbd303 is described below
commit aa7a3dbd3037bbf0de703361c4655335ce6402c4
Author: Abhinav Dixit <[email protected]>
AuthorDate: Wed Nov 20 22:56:44 2024 +0530
KAFKA-18027: MINOR: Correct DelayedOperationPurgatory code around adding of
an already completed operation (#17842)
Reviewers: Apoorv Mittal <[email protected]>, Jun Rao
<[email protected]>
---
.../org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git
a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java
b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java
index 3491aee139e..380f22c9c8e 100644
---
a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java
+++
b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java
@@ -147,7 +147,10 @@ public class DelayedOperationPurgatory<T extends
DelayedOperation> {
// any exclusive lock. Since
DelayedOperationPurgatory.checkAndComplete() completes delayed operations
asynchronously,
// holding an exclusive lock to make the call is often unnecessary.
if (operation.safeTryCompleteOrElse(() -> {
- watchKeys.forEach(key -> watchForOperation(key, operation));
+ watchKeys.forEach(key -> {
+ if (!operation.isCompleted())
+ watchForOperation(key, operation);
+ });
if (!watchKeys.isEmpty())
estimatedTotalOperations.incrementAndGet();
})) {