This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6e380fbbcc8 KAFKA-19322 Remove the DelayedOperation constructor that 
accepts an external lock (#19798)
6e380fbbcc8 is described below

commit 6e380fbbcc8fde22d1f2bb3310e1270d5b3f4837
Author: YuChia Ma <79797958+mirai1...@users.noreply.github.com>
AuthorDate: Tue May 27 01:05:41 2025 +0800

    KAFKA-19322 Remove the DelayedOperation constructor that accepts an 
external lock (#19798)
    
    Remove the DelayedOperation constructor that accepts an external lock.
    
    According to this [PR](https://github.com/apache/kafka/pull/19759).
    
    Reviewers: Ken Huang <s7133...@gmail.com>, PoAn Yang
     <pay...@apache.org>, TengYao Chi <kiting...@gmail.com>, Chia-Ping Tsai
     <chia7...@gmail.com>
---
 .../src/main/java/kafka/server/share/DelayedShareFetch.java |  2 +-
 core/src/main/scala/kafka/server/DelayedProduce.scala       |  7 ++-----
 core/src/main/scala/kafka/server/ReplicaManager.scala       |  7 +------
 .../coordinator/AbstractCoordinatorConcurrencyTest.scala    |  4 +---
 .../transaction/TransactionStateManagerTest.scala           |  7 -------
 core/src/test/scala/unit/kafka/server/KafkaApisTest.scala   |  3 ---
 .../org/apache/kafka/server/purgatory/DelayedOperation.java | 13 +------------
 7 files changed, 6 insertions(+), 37 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java 
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index 8446b7acad6..727a88bc3fc 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -172,7 +172,7 @@ public class DelayedShareFetch extends DelayedOperation {
         Uuid fetchId,
         long remoteFetchMaxWaitMs
     ) {
-        super(shareFetch.fetchParams().maxWaitMs, Optional.empty());
+        super(shareFetch.fetchParams().maxWaitMs);
         this.shareFetch = shareFetch;
         this.replicaManager = replicaManager;
         this.partitionsAcquired = new LinkedHashMap<>();
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala 
b/core/src/main/scala/kafka/server/DelayedProduce.scala
index b60c79125b1..1d21ec78e4c 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
-import java.util.concurrent.locks.Lock
 import com.typesafe.scalalogging.Logger
 import com.yammer.metrics.core.Meter
 import kafka.utils.Logging
@@ -30,7 +29,6 @@ import org.apache.kafka.server.purgatory.DelayedOperation
 
 import scala.collection._
 import scala.jdk.CollectionConverters._
-import scala.jdk.OptionConverters.RichOption
 
 case class ProducePartitionStatus(requiredOffset: Long, responseStatus: 
PartitionResponse) {
   @volatile var acksPending = false
@@ -59,9 +57,8 @@ object DelayedProduce {
 class DelayedProduce(delayMs: Long,
                      produceMetadata: ProduceMetadata,
                      replicaManager: ReplicaManager,
-                     responseCallback: Map[TopicIdPartition, 
PartitionResponse] => Unit,
-                     lockOpt: Option[Lock])
-  extends DelayedOperation(delayMs, lockOpt.toJava) with Logging {
+                     responseCallback: Map[TopicIdPartition, 
PartitionResponse] => Unit)
+  extends DelayedOperation(delayMs) with Logging {
 
   override lazy val logger: Logger = DelayedProduce.logger
 
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b0b2e72602a..ba34b2af132 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -71,7 +71,6 @@ import java.lang.{Long => JLong}
 import java.nio.file.{Files, Paths}
 import java.util
 import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.locks.Lock
 import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, Future, 
RejectedExecutionException, TimeUnit}
 import java.util.{Collections, Optional, OptionalInt, OptionalLong}
 import java.util.function.Consumer
@@ -723,7 +722,6 @@ class ReplicaManager(val config: KafkaConfig,
    *                                      If topic partition contains 
Uuid.ZERO_UUID as topicId the method
    *                                      will fall back to the old behaviour 
and rely on topic name.
    * @param responseCallback              callback for sending the response
-   * @param delayedProduceLock            lock for the delayed actions
    * @param recordValidationStatsCallback callback for updating stats on 
record conversions
    * @param requestLocal                  container for the stateful instances 
scoped to this request -- this must correspond to the
    *                                      thread calling this method
@@ -735,7 +733,6 @@ class ReplicaManager(val config: KafkaConfig,
                     origin: AppendOrigin,
                     entriesPerPartition: Map[TopicIdPartition, MemoryRecords],
                     responseCallback: Map[TopicIdPartition, PartitionResponse] 
=> Unit,
-                    delayedProduceLock: Option[Lock] = None,
                     recordValidationStatsCallback: Map[TopicIdPartition, 
RecordValidationStats] => Unit = _ => (),
                     requestLocal: RequestLocal = RequestLocal.noCaching,
                     verificationGuards: Map[TopicPartition, VerificationGuard] 
= Map.empty): Unit = {
@@ -762,7 +759,6 @@ class ReplicaManager(val config: KafkaConfig,
 
     maybeAddDelayedProduce(
       requiredAcks,
-      delayedProduceLock,
       timeout,
       entriesPerPartition,
       localProduceResults,
@@ -967,7 +963,6 @@ class ReplicaManager(val config: KafkaConfig,
 
   private def maybeAddDelayedProduce(
     requiredAcks: Short,
-    delayedProduceLock: Option[Lock],
     timeoutMs: Long,
     entriesPerPartition: Map[TopicIdPartition, MemoryRecords],
     initialAppendResults: Map[TopicIdPartition, LogAppendResult],
@@ -977,7 +972,7 @@ class ReplicaManager(val config: KafkaConfig,
     if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, 
initialAppendResults)) {
       // create delayed produce operation
       val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus)
-      val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, 
this, responseCallback, delayedProduceLock)
+      val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, 
this, responseCallback)
 
       // create a list of (topic, partition) pairs to use as keys for this 
delayed produce operation
       val producerRequestKeys = entriesPerPartition.keys.map(new 
TopicPartitionOperationKey(_)).toList
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index 56ccb822947..2e9f95beb51 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -20,7 +20,6 @@ package kafka.coordinator
 import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Executors}
 import java.util.{Collections, Random}
 import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.locks.Lock
 import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
 import kafka.cluster.Partition
 import kafka.log.LogManager
@@ -216,7 +215,6 @@ object AbstractCoordinatorConcurrencyTest {
                                origin: AppendOrigin,
                                entriesPerPartition: Map[TopicIdPartition, 
MemoryRecords],
                                responseCallback: Map[TopicIdPartition, 
PartitionResponse] => Unit,
-                               delayedProduceLock: Option[Lock] = None,
                                processingStatsCallback: Map[TopicIdPartition, 
RecordValidationStats] => Unit = _ => (),
                                requestLocal: RequestLocal = 
RequestLocal.noCaching,
                                verificationGuards: Map[TopicPartition, 
VerificationGuard] = Map.empty): Unit = {
@@ -227,7 +225,7 @@ object AbstractCoordinatorConcurrencyTest {
         case (tp, _) =>
           (tp, ProducePartitionStatus(0L, new PartitionResponse(Errors.NONE, 
0L, RecordBatch.NO_TIMESTAMP, 0L)))
       })
-      val delayedProduce = new DelayedProduce(5, produceMetadata, this, 
responseCallback, delayedProduceLock) {
+      val delayedProduce = new DelayedProduce(5, produceMetadata, this, 
responseCallback) {
         // Complete produce requests after a few attempts to trigger delayed 
produce from different threads
         val completeAttempts = new AtomicInteger
         override def tryComplete(): Boolean = {
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 5923566d92a..212a915c800 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -19,7 +19,6 @@ package kafka.coordinator.transaction
 import java.lang.management.ManagementFactory
 import java.nio.ByteBuffer
 import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
-import java.util.concurrent.locks.ReentrantLock
 import javax.management.ObjectName
 import kafka.server.ReplicaManager
 import kafka.utils.TestUtils
@@ -758,7 +757,6 @@ class TransactionStateManagerTest {
       ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
       any(),
       any(),
-      any[Option[ReentrantLock]],
       any(),
       any(),
       any()
@@ -803,7 +801,6 @@ class TransactionStateManagerTest {
       ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
       any(),
       any(),
-      any[Option[ReentrantLock]],
       any(),
       any(),
       any()
@@ -847,7 +844,6 @@ class TransactionStateManagerTest {
       ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
       any(),
       any(),
-      any[Option[ReentrantLock]],
       any(),
       any(),
       any())
@@ -901,7 +897,6 @@ class TransactionStateManagerTest {
       ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
       any(),
       any(),
-      any[Option[ReentrantLock]],
       any(),
       any(),
       any()
@@ -1118,7 +1113,6 @@ class TransactionStateManagerTest {
       ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
       recordsCapture.capture(),
       callbackCapture.capture(),
-      any[Option[ReentrantLock]],
       any(),
       any(),
       any()
@@ -1271,7 +1265,6 @@ class TransactionStateManagerTest {
       origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
       any[Map[TopicIdPartition, MemoryRecords]],
       capturedArgument.capture(),
-      any[Option[ReentrantLock]],
       any(),
       any(),
       any()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 5155b9f3aed..c0e6c7b9c91 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -2830,7 +2830,6 @@ class KafkaApisTest extends Logging {
       any(),
       responseCallback.capture(),
       any(),
-      any(),
       ArgumentMatchers.eq(requestLocal),
       any()
     )).thenAnswer(_ => responseCallback.getValue.apply(Map(new 
TopicIdPartition(topicId,tp2) -> new PartitionResponse(Errors.NONE))))
@@ -2886,7 +2885,6 @@ class KafkaApisTest extends Logging {
       any(),
       any(),
       any(),
-      any(),
       ArgumentMatchers.eq(requestLocal),
       any())
   }
@@ -2965,7 +2963,6 @@ class KafkaApisTest extends Logging {
       entriesPerPartition.capture(),
       responseCallback.capture(),
       any(),
-      any(),
       ArgumentMatchers.eq(RequestLocal.noCaching),
       any()
     )).thenAnswer { _ =>
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java
 
b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java
index 16bfdabdb67..82b91c44cca 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java
@@ -18,8 +18,6 @@ package org.apache.kafka.server.purgatory;
 
 import org.apache.kafka.server.util.timer.TimerTask;
 
-import java.util.Optional;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
@@ -41,19 +39,10 @@ public abstract class DelayedOperation extends TimerTask {
 
     private volatile boolean completed = false;
 
-    protected final Lock lock;
-
-    public DelayedOperation(long delayMs, Optional<Lock> lockOpt) {
-        this(delayMs, lockOpt.orElse(new ReentrantLock()));
-    }
+    protected final ReentrantLock lock = new ReentrantLock();
 
     public DelayedOperation(long delayMs) {
-        this(delayMs, new ReentrantLock());
-    }
-
-    public DelayedOperation(long delayMs, Lock lock) {
         super(delayMs);
-        this.lock = lock;
     }
 
     /*

Reply via email to