This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cebec91470c KAFKA-18080 Replace DelayedItem by Long type (#17927)
cebec91470c is described below
commit cebec91470c440a1735e50210ae3fc0f62ee40a7
Author: Yung <[email protected]>
AuthorDate: Tue Nov 26 17:00:27 2024 +0800
KAFKA-18080 Replace DelayedItem by Long type (#17927)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../scala/kafka/server/AbstractFetcherThread.scala | 19 ++++++----
core/src/main/scala/kafka/utils/DelayedItem.scala | 44 ----------------------
.../server/ReplicaAlterLogDirsThreadTest.scala | 8 ++--
3 files changed, 15 insertions(+), 56 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 061e7b1171d..9e7b1ee6e99 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -20,7 +20,7 @@ package kafka.server
import com.yammer.metrics.core.Meter
import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
import kafka.utils.CoreUtils.inLock
-import kafka.utils.{DelayedItem, Logging, Pool}
+import kafka.utils.{Logging, Pool}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.PartitionStates
import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
@@ -30,6 +30,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
import
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{ClientIdAndBroker, InvalidRecordException,
TopicPartition, Uuid}
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.metrics.KafkaMetricsGroup
@@ -789,7 +790,7 @@ abstract class AbstractFetcherThread(name: String,
Option(partitionStates.stateValue(partition)).foreach {
currentFetchState =>
if (!currentFetchState.isDelayed) {
partitionStates.updateAndMoveToEnd(partition,
PartitionFetchState(currentFetchState.topicId, currentFetchState.fetchOffset,
- currentFetchState.lag, currentFetchState.currentLeaderEpoch,
Some(new DelayedItem(delay)),
+ currentFetchState.lag, currentFetchState.currentLeaderEpoch,
Some(delay),
currentFetchState.state, currentFetchState.lastFetchedEpoch))
}
}
@@ -945,25 +946,27 @@ object PartitionFetchState {
/**
* case class to keep partition offset and its state(truncatingLog, delayed)
* This represents a partition as being either:
- * (1) Truncating its log, for example having recently become a follower
- * (2) Delayed, for example due to an error, where we subsequently back off a
bit
- * (3) ReadyForFetch, the is the active state where the thread is actively
fetching data.
+ * (1) Truncating its log, for example, having recently become a follower
+ * (2) Delayed, for example, due to an error, where we subsequently back off a
bit
+ * (3) ReadyForFetch, the active state where the thread is actively fetching
data.
*/
case class PartitionFetchState(topicId: Option[Uuid],
fetchOffset: Long,
lag: Option[Long],
currentLeaderEpoch: Int,
- delay: Option[DelayedItem],
+ delay: Option[Long],
state: ReplicaState,
lastFetchedEpoch: Option[Int]) {
+ private val dueMs = delay.map(_ + Time.SYSTEM.milliseconds)
+
def isReadyForFetch: Boolean = state == Fetching && !isDelayed
def isReplicaInSync: Boolean = lag.isDefined && lag.get <= 0
def isTruncating: Boolean = state == Truncating && !isDelayed
- def isDelayed: Boolean = delay.exists(_.getDelay(TimeUnit.MILLISECONDS) > 0)
+ def isDelayed: Boolean = dueMs.exists(_ > Time.SYSTEM.milliseconds)
override def toString: String = {
s"FetchState(topicId=$topicId" +
@@ -972,7 +975,7 @@ case class PartitionFetchState(topicId: Option[Uuid],
s", lastFetchedEpoch=$lastFetchedEpoch" +
s", state=$state" +
s", lag=$lag" +
- s", delay=${delay.map(_.delayMs).getOrElse(0)}ms" +
+ s", delay=${delay.getOrElse(0)}ms" +
s")"
}
diff --git a/core/src/main/scala/kafka/utils/DelayedItem.scala
b/core/src/main/scala/kafka/utils/DelayedItem.scala
deleted file mode 100644
index cfb87719a00..00000000000
--- a/core/src/main/scala/kafka/utils/DelayedItem.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import java.util.concurrent._
-
-import org.apache.kafka.common.utils.Time
-
-import scala.math._
-
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
- private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
- def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
-
- /**
- * The remaining delay time
- */
- def getDelay(unit: TimeUnit): Long = {
- unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0),
TimeUnit.MILLISECONDS)
- }
-
- def compareTo(d: Delayed): Int = {
- val other = d.asInstanceOf[DelayedItem]
- java.lang.Long.compare(dueMs, other.dueMs)
- }
-
-}
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 98436915dbf..d61ca335b47 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -22,7 +22,7 @@ import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.server.QuotaFactory.UNBOUNDED_QUOTA
import kafka.server.ReplicaAlterLogDirsThread.ReassignmentState
import kafka.server.metadata.ZkMetadataCache
-import kafka.utils.{DelayedItem, TestUtils}
+import kafka.utils.TestUtils
import org.apache.kafka.common.errors.KafkaStorageException
import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
@@ -1168,7 +1168,7 @@ class ReplicaAlterLogDirsThreadTest {
// one partition is ready and one is delayed
val ResultWithPartitions(fetchRequest2Opt, partitionsWithError2) =
thread.leader.buildFetch(Map(
t1p0 -> PartitionFetchState(Some(topicId), 140, None, leaderEpoch,
state = Fetching, lastFetchedEpoch = None),
- t1p1 -> PartitionFetchState(Some(topicId), 160, None, leaderEpoch,
delay = Some(new DelayedItem(5000)), state = Fetching, lastFetchedEpoch =
None)))
+ t1p1 -> PartitionFetchState(Some(topicId), 160, None, leaderEpoch,
delay = Some(5000), state = Fetching, lastFetchedEpoch = None)))
assertTrue(fetchRequest2Opt.isDefined)
val fetchRequest2 = fetchRequest2Opt.get
@@ -1181,8 +1181,8 @@ class ReplicaAlterLogDirsThreadTest {
// both partitions are delayed
val ResultWithPartitions(fetchRequest3Opt, partitionsWithError3) =
thread.leader.buildFetch(Map(
- t1p0 -> PartitionFetchState(Some(topicId), 140, None, leaderEpoch,
delay = Some(new DelayedItem(5000)), state = Fetching, lastFetchedEpoch = None),
- t1p1 -> PartitionFetchState(Some(topicId), 160, None, leaderEpoch,
delay = Some(new DelayedItem(5000)), state = Fetching, lastFetchedEpoch =
None)))
+ t1p0 -> PartitionFetchState(Some(topicId), 140, None, leaderEpoch,
delay = Some(5000), state = Fetching, lastFetchedEpoch = None),
+ t1p1 -> PartitionFetchState(Some(topicId), 160, None, leaderEpoch,
delay = Some(5000), state = Fetching, lastFetchedEpoch = None)))
assertTrue(fetchRequest3Opt.isEmpty, "Expected no fetch requests since all
partitions are delayed")
assertFalse(partitionsWithError3.nonEmpty)
}