This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 6e380fbbcc8 KAFKA-19322 Remove the DelayedOperation constructor that accepts an external lock (#19798) 6e380fbbcc8 is described below commit 6e380fbbcc8fde22d1f2bb3310e1270d5b3f4837 Author: YuChia Ma <79797958+mirai1...@users.noreply.github.com> AuthorDate: Tue May 27 01:05:41 2025 +0800 KAFKA-19322 Remove the DelayedOperation constructor that accepts an external lock (#19798) Remove the DelayedOperation constructor that accepts an external lock. According to this [PR](https://github.com/apache/kafka/pull/19759). Reviewers: Ken Huang <s7133...@gmail.com>, PoAn Yang <pay...@apache.org>, TengYao Chi <kiting...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com> --- .../src/main/java/kafka/server/share/DelayedShareFetch.java | 2 +- core/src/main/scala/kafka/server/DelayedProduce.scala | 7 ++----- core/src/main/scala/kafka/server/ReplicaManager.scala | 7 +------ .../coordinator/AbstractCoordinatorConcurrencyTest.scala | 4 +--- .../transaction/TransactionStateManagerTest.scala | 7 ------- core/src/test/scala/unit/kafka/server/KafkaApisTest.scala | 3 --- .../org/apache/kafka/server/purgatory/DelayedOperation.java | 13 +------------ 7 files changed, 6 insertions(+), 37 deletions(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 8446b7acad6..727a88bc3fc 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -172,7 +172,7 @@ public class DelayedShareFetch extends DelayedOperation { Uuid fetchId, long remoteFetchMaxWaitMs ) { - super(shareFetch.fetchParams().maxWaitMs, Optional.empty()); + super(shareFetch.fetchParams().maxWaitMs); this.shareFetch = shareFetch; this.replicaManager = replicaManager; this.partitionsAcquired = new LinkedHashMap<>(); diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index b60c79125b1..1d21ec78e4c 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -18,7 +18,6 @@ package kafka.server import java.util.concurrent.{ConcurrentHashMap, TimeUnit} -import java.util.concurrent.locks.Lock import com.typesafe.scalalogging.Logger import com.yammer.metrics.core.Meter import kafka.utils.Logging @@ -30,7 +29,6 @@ import org.apache.kafka.server.purgatory.DelayedOperation import scala.collection._ import scala.jdk.CollectionConverters._ -import scala.jdk.OptionConverters.RichOption case class ProducePartitionStatus(requiredOffset: Long, responseStatus: PartitionResponse) { @volatile var acksPending = false @@ -59,9 +57,8 @@ object DelayedProduce { class DelayedProduce(delayMs: Long, produceMetadata: ProduceMetadata, replicaManager: ReplicaManager, - responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit, - lockOpt: Option[Lock]) - extends DelayedOperation(delayMs, lockOpt.toJava) with Logging { + responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit) + extends DelayedOperation(delayMs) with Logging { override lazy val logger: Logger = DelayedProduce.logger diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index b0b2e72602a..ba34b2af132 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -71,7 +71,6 @@ import java.lang.{Long => JLong} import java.nio.file.{Files, Paths} import java.util import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.locks.Lock import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, Future, RejectedExecutionException, TimeUnit} import java.util.{Collections, Optional, OptionalInt, OptionalLong} import java.util.function.Consumer @@ -723,7 +722,6 @@ class ReplicaManager(val config: KafkaConfig, * If topic partition contains Uuid.ZERO_UUID as topicId the method * will fall back to the old behaviour and rely on topic name. * @param responseCallback callback for sending the response - * @param delayedProduceLock lock for the delayed actions * @param recordValidationStatsCallback callback for updating stats on record conversions * @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the * thread calling this method @@ -735,7 +733,6 @@ class ReplicaManager(val config: KafkaConfig, origin: AppendOrigin, entriesPerPartition: Map[TopicIdPartition, MemoryRecords], responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit, - delayedProduceLock: Option[Lock] = None, recordValidationStatsCallback: Map[TopicIdPartition, RecordValidationStats] => Unit = _ => (), requestLocal: RequestLocal = RequestLocal.noCaching, verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty): Unit = { @@ -762,7 +759,6 @@ class ReplicaManager(val config: KafkaConfig, maybeAddDelayedProduce( requiredAcks, - delayedProduceLock, timeout, entriesPerPartition, localProduceResults, @@ -967,7 +963,6 @@ class ReplicaManager(val config: KafkaConfig, private def maybeAddDelayedProduce( requiredAcks: Short, - delayedProduceLock: Option[Lock], timeoutMs: Long, entriesPerPartition: Map[TopicIdPartition, MemoryRecords], initialAppendResults: Map[TopicIdPartition, LogAppendResult], @@ -977,7 +972,7 @@ class ReplicaManager(val config: KafkaConfig, if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, initialAppendResults)) { // create delayed produce operation val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus) - val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback, delayedProduceLock) + val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback) // create a list of (topic, partition) pairs to use as keys for this delayed produce operation val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toList diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala index 56ccb822947..2e9f95beb51 100644 --- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala @@ -20,7 +20,6 @@ package kafka.coordinator import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Executors} import java.util.{Collections, Random} import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.locks.Lock import kafka.coordinator.AbstractCoordinatorConcurrencyTest._ import kafka.cluster.Partition import kafka.log.LogManager @@ -216,7 +215,6 @@ object AbstractCoordinatorConcurrencyTest { origin: AppendOrigin, entriesPerPartition: Map[TopicIdPartition, MemoryRecords], responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit, - delayedProduceLock: Option[Lock] = None, processingStatsCallback: Map[TopicIdPartition, RecordValidationStats] => Unit = _ => (), requestLocal: RequestLocal = RequestLocal.noCaching, verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty): Unit = { @@ -227,7 +225,7 @@ object AbstractCoordinatorConcurrencyTest { case (tp, _) => (tp, ProducePartitionStatus(0L, new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L))) }) - val delayedProduce = new DelayedProduce(5, produceMetadata, this, responseCallback, delayedProduceLock) { + val delayedProduce = new DelayedProduce(5, produceMetadata, this, responseCallback) { // Complete produce requests after a few attempts to trigger delayed produce from different threads val completeAttempts = new AtomicInteger override def tryComplete(): Boolean = { diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 5923566d92a..212a915c800 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -19,7 +19,6 @@ package kafka.coordinator.transaction import java.lang.management.ManagementFactory import java.nio.ByteBuffer import java.util.concurrent.{ConcurrentHashMap, CountDownLatch} -import java.util.concurrent.locks.ReentrantLock import javax.management.ObjectName import kafka.server.ReplicaManager import kafka.utils.TestUtils @@ -758,7 +757,6 @@ class TransactionStateManagerTest { ArgumentMatchers.eq(AppendOrigin.COORDINATOR), any(), any(), - any[Option[ReentrantLock]], any(), any(), any() @@ -803,7 +801,6 @@ class TransactionStateManagerTest { ArgumentMatchers.eq(AppendOrigin.COORDINATOR), any(), any(), - any[Option[ReentrantLock]], any(), any(), any() @@ -847,7 +844,6 @@ class TransactionStateManagerTest { ArgumentMatchers.eq(AppendOrigin.COORDINATOR), any(), any(), - any[Option[ReentrantLock]], any(), any(), any()) @@ -901,7 +897,6 @@ class TransactionStateManagerTest { ArgumentMatchers.eq(AppendOrigin.COORDINATOR), any(), any(), - any[Option[ReentrantLock]], any(), any(), any() @@ -1118,7 +1113,6 @@ class TransactionStateManagerTest { ArgumentMatchers.eq(AppendOrigin.COORDINATOR), recordsCapture.capture(), callbackCapture.capture(), - any[Option[ReentrantLock]], any(), any(), any() @@ -1271,7 +1265,6 @@ class TransactionStateManagerTest { origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), any[Map[TopicIdPartition, MemoryRecords]], capturedArgument.capture(), - any[Option[ReentrantLock]], any(), any(), any() diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 5155b9f3aed..c0e6c7b9c91 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -2830,7 +2830,6 @@ class KafkaApisTest extends Logging { any(), responseCallback.capture(), any(), - any(), ArgumentMatchers.eq(requestLocal), any() )).thenAnswer(_ => responseCallback.getValue.apply(Map(new TopicIdPartition(topicId,tp2) -> new PartitionResponse(Errors.NONE)))) @@ -2886,7 +2885,6 @@ class KafkaApisTest extends Logging { any(), any(), any(), - any(), ArgumentMatchers.eq(requestLocal), any()) } @@ -2965,7 +2963,6 @@ class KafkaApisTest extends Logging { entriesPerPartition.capture(), responseCallback.capture(), any(), - any(), ArgumentMatchers.eq(RequestLocal.noCaching), any() )).thenAnswer { _ => diff --git a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java index 16bfdabdb67..82b91c44cca 100644 --- a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java +++ b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java @@ -18,8 +18,6 @@ package org.apache.kafka.server.purgatory; import org.apache.kafka.server.util.timer.TimerTask; -import java.util.Optional; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** @@ -41,19 +39,10 @@ public abstract class DelayedOperation extends TimerTask { private volatile boolean completed = false; - protected final Lock lock; - - public DelayedOperation(long delayMs, Optional<Lock> lockOpt) { - this(delayMs, lockOpt.orElse(new ReentrantLock())); - } + protected final ReentrantLock lock = new ReentrantLock(); public DelayedOperation(long delayMs) { - this(delayMs, new ReentrantLock()); - } - - public DelayedOperation(long delayMs, Lock lock) { super(delayMs); - this.lock = lock; } /*