This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cebec91470c KAFKA-18080 Replace DelayedItem by Long type (#17927)
cebec91470c is described below

commit cebec91470c440a1735e50210ae3fc0f62ee40a7
Author: Yung <[email protected]>
AuthorDate: Tue Nov 26 17:00:27 2024 +0800

    KAFKA-18080 Replace DelayedItem by Long type (#17927)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../scala/kafka/server/AbstractFetcherThread.scala | 19 ++++++----
 core/src/main/scala/kafka/utils/DelayedItem.scala  | 44 ----------------------
 .../server/ReplicaAlterLogDirsThreadTest.scala     |  8 ++--
 3 files changed, 15 insertions(+), 56 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 061e7b1171d..9e7b1ee6e99 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -20,7 +20,7 @@ package kafka.server
 import com.yammer.metrics.core.Meter
 import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
 import kafka.utils.CoreUtils.inLock
-import kafka.utils.{DelayedItem, Logging, Pool}
+import kafka.utils.{Logging, Pool}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.PartitionStates
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
@@ -30,6 +30,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
 import 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
 UNDEFINED_EPOCH_OFFSET}
 import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{ClientIdAndBroker, InvalidRecordException, 
TopicPartition, Uuid}
 import org.apache.kafka.server.common.OffsetAndEpoch
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
@@ -789,7 +790,7 @@ abstract class AbstractFetcherThread(name: String,
         Option(partitionStates.stateValue(partition)).foreach { 
currentFetchState =>
           if (!currentFetchState.isDelayed) {
             partitionStates.updateAndMoveToEnd(partition, 
PartitionFetchState(currentFetchState.topicId, currentFetchState.fetchOffset,
-              currentFetchState.lag, currentFetchState.currentLeaderEpoch, 
Some(new DelayedItem(delay)),
+              currentFetchState.lag, currentFetchState.currentLeaderEpoch, 
Some(delay),
               currentFetchState.state, currentFetchState.lastFetchedEpoch))
           }
         }
@@ -945,25 +946,27 @@ object PartitionFetchState {
 /**
  * case class to keep partition offset and its state(truncatingLog, delayed)
  * This represents a partition as being either:
- * (1) Truncating its log, for example having recently become a follower
- * (2) Delayed, for example due to an error, where we subsequently back off a 
bit
- * (3) ReadyForFetch, the is the active state where the thread is actively 
fetching data.
+ * (1) Truncating its log, for example, having recently become a follower
+ * (2) Delayed, for example, due to an error, where we subsequently back off a 
bit
+ * (3) ReadyForFetch, the active state where the thread is actively fetching 
data.
  */
 case class PartitionFetchState(topicId: Option[Uuid],
                                fetchOffset: Long,
                                lag: Option[Long],
                                currentLeaderEpoch: Int,
-                               delay: Option[DelayedItem],
+                               delay: Option[Long],
                                state: ReplicaState,
                                lastFetchedEpoch: Option[Int]) {
 
+  private val dueMs = delay.map(_ + Time.SYSTEM.milliseconds)
+
   def isReadyForFetch: Boolean = state == Fetching && !isDelayed
 
   def isReplicaInSync: Boolean = lag.isDefined && lag.get <= 0
 
   def isTruncating: Boolean = state == Truncating && !isDelayed
 
-  def isDelayed: Boolean = delay.exists(_.getDelay(TimeUnit.MILLISECONDS) > 0)
+  def isDelayed: Boolean = dueMs.exists(_ > Time.SYSTEM.milliseconds)
 
   override def toString: String = {
     s"FetchState(topicId=$topicId" +
@@ -972,7 +975,7 @@ case class PartitionFetchState(topicId: Option[Uuid],
       s", lastFetchedEpoch=$lastFetchedEpoch" +
       s", state=$state" +
       s", lag=$lag" +
-      s", delay=${delay.map(_.delayMs).getOrElse(0)}ms" +
+      s", delay=${delay.getOrElse(0)}ms" +
       s")"
   }
 
diff --git a/core/src/main/scala/kafka/utils/DelayedItem.scala 
b/core/src/main/scala/kafka/utils/DelayedItem.scala
deleted file mode 100644
index cfb87719a00..00000000000
--- a/core/src/main/scala/kafka/utils/DelayedItem.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import java.util.concurrent._
-
-import org.apache.kafka.common.utils.Time
-
-import scala.math._
-
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
-
-  /**
-   * The remaining delay time
-   */
-  def getDelay(unit: TimeUnit): Long = {
-    unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
-  }
-
-  def compareTo(d: Delayed): Int = {
-    val other = d.asInstanceOf[DelayedItem]
-    java.lang.Long.compare(dueMs, other.dueMs)
-  }
-
-}
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 98436915dbf..d61ca335b47 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -22,7 +22,7 @@ import kafka.server.AbstractFetcherThread.ResultWithPartitions
 import kafka.server.QuotaFactory.UNBOUNDED_QUOTA
 import kafka.server.ReplicaAlterLogDirsThread.ReassignmentState
 import kafka.server.metadata.ZkMetadataCache
-import kafka.utils.{DelayedItem, TestUtils}
+import kafka.utils.TestUtils
 import org.apache.kafka.common.errors.KafkaStorageException
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
@@ -1168,7 +1168,7 @@ class ReplicaAlterLogDirsThreadTest {
     // one partition is ready and one is delayed
     val ResultWithPartitions(fetchRequest2Opt, partitionsWithError2) = 
thread.leader.buildFetch(Map(
         t1p0 -> PartitionFetchState(Some(topicId), 140, None, leaderEpoch, 
state = Fetching, lastFetchedEpoch = None),
-        t1p1 -> PartitionFetchState(Some(topicId), 160, None, leaderEpoch, 
delay = Some(new DelayedItem(5000)), state = Fetching, lastFetchedEpoch = 
None)))
+        t1p1 -> PartitionFetchState(Some(topicId), 160, None, leaderEpoch, 
delay = Some(5000), state = Fetching, lastFetchedEpoch = None)))
 
     assertTrue(fetchRequest2Opt.isDefined)
     val fetchRequest2 = fetchRequest2Opt.get
@@ -1181,8 +1181,8 @@ class ReplicaAlterLogDirsThreadTest {
 
     // both partitions are delayed
     val ResultWithPartitions(fetchRequest3Opt, partitionsWithError3) = 
thread.leader.buildFetch(Map(
-        t1p0 -> PartitionFetchState(Some(topicId), 140, None, leaderEpoch, 
delay = Some(new DelayedItem(5000)), state = Fetching, lastFetchedEpoch = None),
-        t1p1 -> PartitionFetchState(Some(topicId), 160, None, leaderEpoch, 
delay = Some(new DelayedItem(5000)), state = Fetching, lastFetchedEpoch = 
None)))
+        t1p0 -> PartitionFetchState(Some(topicId), 140, None, leaderEpoch, 
delay = Some(5000), state = Fetching, lastFetchedEpoch = None),
+        t1p1 -> PartitionFetchState(Some(topicId), 160, None, leaderEpoch, 
delay = Some(5000), state = Fetching, lastFetchedEpoch = None)))
     assertTrue(fetchRequest3Opt.isEmpty, "Expected no fetch requests since all 
partitions are delayed")
     assertFalse(partitionsWithError3.nonEmpty)
   }

Reply via email to