This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2cfd28ed8ba KAFKA-19915: [2/3] Move HostedPartition to server module
(#21475)
2cfd28ed8ba is described below
commit 2cfd28ed8ba894b6f72cbe476987863d6078fc35
Author: Christo Lolov <[email protected]>
AuthorDate: Wed Feb 18 04:55:21 2026 +0000
KAFKA-19915: [2/3] Move HostedPartition to server module (#21475)
This PR ports `HostedPartition` from a Scala sealed trait in
`ReplicaManager` to a Java sealed interface in the server module.
Reviewers: Mickael Maison <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
checkstyle/import-control-storage.xml | 3 +-
.../main/scala/kafka/server/ReplicaManager.scala | 114 +++++++++------------
.../server/ReplicaManagerConcurrencyTest.scala | 5 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 79 +++++++-------
.../org/apache/kafka/server/HostedPartition.java | 41 ++++++++
.../org/apache/kafka/server/KRaftClusterTest.java | 4 +-
.../TransactionsWithTieredStoreTest.java | 2 +-
7 files changed, 138 insertions(+), 110 deletions(-)
diff --git a/checkstyle/import-control-storage.xml
b/checkstyle/import-control-storage.xml
index fb511c4fb79..44f25daf5bc 100644
--- a/checkstyle/import-control-storage.xml
+++ b/checkstyle/import-control-storage.xml
@@ -129,8 +129,7 @@
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.storage"/>
- <allow pkg="org.apache.kafka.server.log" />
- <allow pkg="org.apache.kafka.server.config" />
+ <allow pkg="org.apache.kafka.server" />
<allow pkg="org.apache.kafka.test" />
</subpackage>
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 6e433ce1526..67dc3dc9463 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -19,7 +19,6 @@ package kafka.server
import com.yammer.metrics.core.Meter
import kafka.cluster.Partition
import kafka.log.LogManager
-import kafka.server.HostedPartition.Online
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName,
FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName,
IsrShrinksPerSecMetricName, LeaderCountMetricName,
OfflineReplicaCountMetricName, PartitionCountMetricName,
PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName,
ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName,
UnderReplicatedPartitionsMetricName, createLogReadResult,
isListOffsetsTimestampUnsupported}
import kafka.server.share.DelayedShareFetch
@@ -64,7 +63,7 @@ import
org.apache.kafka.server.transaction.AddPartitionsToTxnManager
import
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation
import org.apache.kafka.server.util.timer.{SystemTimer, TimerTask}
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
-import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common}
+import org.apache.kafka.server.{ActionQueue, DelayedActionQueue,
HostedPartition, common}
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints,
OffsetCheckpointFile, OffsetCheckpoints}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo,
FetchPartitionStatus, LeaderHwChange, LogAppendInfo, LogConfig,
LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogReadResult,
OffsetResultHolder, RecordValidationException, RecordValidationStats,
RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -107,31 +106,6 @@ case class LogDeleteRecordsResult(requestedOffset: Long,
lowWatermark: Long, exc
}
}
-/**
- * Trait to represent the state of hosted partitions. We create a concrete
(active) Partition
- * instance when the broker receives a LeaderAndIsr request from the
controller or a metadata
- * log record from the Quorum controller indicating that the broker should be
either a leader
- * or follower of a partition.
- */
-sealed trait HostedPartition
-
-object HostedPartition {
- /**
- * This broker does not have any state for this partition locally.
- */
- final object None extends HostedPartition
-
- /**
- * This broker hosts the partition and it is online.
- */
- final case class Online(partition: Partition) extends HostedPartition
-
- /**
- * This broker hosts the partition, but it is in an offline log directory.
- */
- final case class Offline(partition: Option[Partition]) extends
HostedPartition
-}
-
object ReplicaManager {
val HighWatermarkFilename = "replication-offset-checkpoint"
@@ -256,7 +230,7 @@ class ReplicaManager(val config: KafkaConfig,
/* epoch of the controller that last changed the leader */
protected val localBrokerId = config.brokerId
- protected val allPartitions = new ConcurrentHashMap[TopicPartition,
HostedPartition]
+ protected val allPartitions = new ConcurrentHashMap[TopicPartition,
HostedPartition[Partition]]
private val replicaStateChangeLock = new Object
val replicaFetcherManager = createReplicaFetcherManager(metrics, time,
quotaManagers.follower)
private[server] val replicaAlterLogDirsManager =
createReplicaAlterLogDirsManager(quotaManagers.alterLogDirs, brokerTopicStats)
@@ -340,8 +314,9 @@ class ReplicaManager(val config: KafkaConfig,
private def maybeRemoveTopicMetrics(topic: String): Unit = {
val topicHasNonOfflinePartition = allPartitions.values.asScala.exists {
- case online: HostedPartition.Online => topic == online.partition.topic
- case HostedPartition.None | HostedPartition.Offline(_) => false
+ case online: HostedPartition.Online[Partition] => topic ==
online.partition.topic
+ case _: HostedPartition.Offline[Partition] => false
+ case _: HostedPartition.None[Partition] => false
}
if (!topicHasNonOfflinePartition) // nothing online or deferred
brokerTopicStats.removeMetrics(topic)
@@ -400,8 +375,8 @@ class ReplicaManager(val config: KafkaConfig,
*/
def maybeAddListener(partition: TopicPartition, listener:
PartitionListener): Boolean = {
getPartition(partition) match {
- case HostedPartition.Online(partition) =>
- partition.maybeAddListener(listener)
+ case online: HostedPartition.Online[Partition] =>
+ online.partition.maybeAddListener(listener)
case _ =>
false
}
@@ -412,8 +387,8 @@ class ReplicaManager(val config: KafkaConfig,
*/
def removeListener(partition: TopicPartition, listener: PartitionListener):
Unit = {
getPartition(partition) match {
- case HostedPartition.Online(partition) =>
- partition.removeListener(listener)
+ case online: HostedPartition.Online[Partition] =>
+ online.partition.removeListener(listener)
case _ => // Ignore
}
}
@@ -441,7 +416,7 @@ class ReplicaManager(val config: KafkaConfig,
var topicId: Option[Uuid] = None
if (stopPartition.deleteLocalLog) {
getPartition(topicPartition) match {
- case hostedPartition: HostedPartition.Online =>
+ case hostedPartition: HostedPartition.Online[Partition] =>
if (allPartitions.remove(topicPartition, hostedPartition)) {
maybeRemoveTopicMetrics(topicPartition.topic)
// Logs are not deleted here. They are deleted in a single batch
later on.
@@ -483,13 +458,13 @@ class ReplicaManager(val config: KafkaConfig,
new TopicIdPartition(topicId, topicPartition)
}
- def getPartition(topicPartition: TopicPartition): HostedPartition = {
- Option(allPartitions.get(topicPartition)).getOrElse(HostedPartition.None)
+ def getPartition(topicPartition: TopicPartition): HostedPartition[Partition]
= {
+ Option(allPartitions.get(topicPartition)).getOrElse(new
HostedPartition.None[Partition])
}
def isAddingReplica(topicPartition: TopicPartition, replicaId: Int): Boolean
= {
getPartition(topicPartition) match {
- case Online(partition) => partition.isAddingReplica(replicaId)
+ case online: HostedPartition.Online[Partition] =>
online.partition.isAddingReplica(replicaId)
case _ => false
}
}
@@ -503,12 +478,12 @@ class ReplicaManager(val config: KafkaConfig,
// Visible for testing
private[server] def addOnlinePartition(topicPartition: TopicPartition,
partition: Partition): Unit = {
- allPartitions.put(topicPartition, HostedPartition.Online(partition))
+ allPartitions.put(topicPartition, new HostedPartition.Online(partition))
}
def onlinePartition(topicPartition: TopicPartition): Option[Partition] = {
getPartition(topicPartition) match {
- case HostedPartition.Online(partition) => Some(partition)
+ case online: HostedPartition.Online[Partition] => Some(online.partition)
case _ => None
}
}
@@ -517,13 +492,13 @@ class ReplicaManager(val config: KafkaConfig,
// the iterator has been constructed could still be returned by this
iterator.
private def onlinePartitionsIterator: Iterator[Partition] = {
allPartitions.values.asScala.iterator.flatMap {
- case HostedPartition.Online(partition) => Some(partition)
+ case online: HostedPartition.Online[Partition] => Some(online.partition)
case _ => None
}
}
private def offlinePartitionCount: Int = {
- allPartitions.values.asScala.iterator.count(_.getClass ==
HostedPartition.Offline.getClass)
+ allPartitions.values.asScala.iterator.count(_.getClass ==
classOf[HostedPartition.Offline[Partition]])
}
def getPartitionOrException(topicPartition: TopicPartition): Partition = {
@@ -561,20 +536,20 @@ class ReplicaManager(val config: KafkaConfig,
def getPartitionOrError(topicPartition: TopicPartition): Either[Errors,
Partition] = {
getPartition(topicPartition) match {
- case HostedPartition.Online(partition) =>
- Right(partition)
+ case online: HostedPartition.Online[Partition] =>
+ Right(online.partition)
- case HostedPartition.Offline(_) =>
+ case _: HostedPartition.Offline[Partition] =>
Left(Errors.KAFKA_STORAGE_ERROR)
- case HostedPartition.None if metadataCache.contains(topicPartition) =>
+ case _: HostedPartition.None[Partition] if
metadataCache.contains(topicPartition) =>
// The topic exists, but this broker is no longer a replica of it, so
we return NOT_LEADER_OR_FOLLOWER which
// forces clients to refresh metadata to find the new location. This
can happen, for example,
// during a partition reassignment if a produce request from the
client is sent to a broker after
// the local replica has been deleted.
Left(Errors.NOT_LEADER_OR_FOLLOWER)
- case HostedPartition.None =>
+ case _: HostedPartition.None[Partition] =>
Left(Errors.UNKNOWN_TOPIC_OR_PARTITION)
}
}
@@ -1162,7 +1137,8 @@ class ReplicaManager(val config: KafkaConfig,
throw new KafkaStorageException(s"Log directory $destinationDir is
offline")
getPartition(topicPartition) match {
- case HostedPartition.Online(partition) =>
+ case online: HostedPartition.Online[Partition] =>
+ val partition = online.partition
// Stop current replica movement if the destinationDir is
different from the existing destination log directory
if (partition.futureReplicaDirChanged(destinationDir)) {
replicaAlterLogDirsManager.removeFetcherForPartitions(Set(topicPartition))
@@ -1176,10 +1152,10 @@ class ReplicaManager(val config: KafkaConfig,
logManager.resumeCleaning(topicPartition)
}
}
- case HostedPartition.Offline(_) =>
+ case _: HostedPartition.Offline[Partition] =>
throw new KafkaStorageException(s"Partition $topicPartition is
offline")
- case HostedPartition.None => // Do nothing
+ case _: HostedPartition.None[Partition] => // Do nothing
}
// If the log for this partition has not been created yet:
@@ -1331,7 +1307,8 @@ class ReplicaManager(val config: KafkaConfig,
if (delayedDeleteRecordsRequired(localDeleteRecordsResults)) {
def onAcks(topicPartition: TopicPartition, status:
DeleteRecordsPartitionStatus): Unit = {
val (lowWatermarkReached, error, lw) = getPartition(topicPartition)
match {
- case HostedPartition.Online(partition) =>
+ case online: HostedPartition.Online[Partition] =>
+ val partition = online.partition
partition.leaderLogIfLocal match {
case Some(_) =>
val leaderLW = partition.lowWatermarkIfLeader
@@ -1340,10 +1317,10 @@ class ReplicaManager(val config: KafkaConfig,
(false, Errors.NOT_LEADER_OR_FOLLOWER,
DeleteRecordsResponse.INVALID_LOW_WATERMARK)
}
- case HostedPartition.Offline(_) =>
+ case _: HostedPartition.Offline[Partition] =>
(false, Errors.KAFKA_STORAGE_ERROR,
DeleteRecordsResponse.INVALID_LOW_WATERMARK)
- case HostedPartition.None =>
+ case _: HostedPartition.None[Partition] =>
(false, Errors.UNKNOWN_TOPIC_OR_PARTITION,
DeleteRecordsResponse.INVALID_LOW_WATERMARK)
}
if (error != Errors.NONE || lowWatermarkReached) {
@@ -2152,11 +2129,12 @@ class ReplicaManager(val config: KafkaConfig,
def markPartitionOffline(tp: TopicPartition): Unit = replicaStateChangeLock
synchronized {
allPartitions.get(tp) match {
- case HostedPartition.Online(partition) =>
- allPartitions.put(tp, HostedPartition.Offline(Some(partition)))
+ case online: HostedPartition.Online[Partition] =>
+ val partition = online.partition
+ allPartitions.put(tp, new
HostedPartition.Offline(Optional.of(partition)))
partition.markOffline()
case _ =>
- allPartitions.put(tp, HostedPartition.Offline(None))
+ allPartitions.put(tp, new HostedPartition.Offline(Optional.empty()))
}
}
@@ -2274,29 +2252,29 @@ class ReplicaManager(val config: KafkaConfig,
val partitions = offsetForLeaderTopic.partitions.asScala.map {
offsetForLeaderPartition =>
val tp = new TopicPartition(offsetForLeaderTopic.topic,
offsetForLeaderPartition.partition)
getPartition(tp) match {
- case HostedPartition.Online(partition) =>
+ case online: HostedPartition.Online[Partition] =>
val currentLeaderEpochOpt =
if (offsetForLeaderPartition.currentLeaderEpoch ==
RecordBatch.NO_PARTITION_LEADER_EPOCH)
Optional.empty[Integer]
else
Optional.of[Integer](offsetForLeaderPartition.currentLeaderEpoch)
- partition.lastOffsetForLeaderEpoch(
+ online.partition.lastOffsetForLeaderEpoch(
currentLeaderEpochOpt,
offsetForLeaderPartition.leaderEpoch,
fetchOnlyFromLeader = true)
- case HostedPartition.Offline(_) =>
+ case _: HostedPartition.Offline[Partition] =>
new EpochEndOffset()
.setPartition(offsetForLeaderPartition.partition)
.setErrorCode(Errors.KAFKA_STORAGE_ERROR.code)
- case HostedPartition.None if metadataCache.contains(tp) =>
+ case _: HostedPartition.None[Partition] if
metadataCache.contains(tp) =>
new EpochEndOffset()
.setPartition(offsetForLeaderPartition.partition)
.setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code)
- case HostedPartition.None =>
+ case _: HostedPartition.None[Partition] =>
new EpochEndOffset()
.setPartition(offsetForLeaderPartition.partition)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
@@ -2322,8 +2300,9 @@ class ReplicaManager(val config: KafkaConfig,
delta: TopicsDelta,
topicId: Uuid): Option[(Partition,
Boolean)] = {
getPartition(tp) match {
- case HostedPartition.Offline(offlinePartition) =>
- if (offlinePartition.flatMap(p => p.topicId).contains(topicId)) {
+ case offline: HostedPartition.Offline[Partition] =>
+ val offlinePartition = offline.partition
+ if (offlinePartition.toScala.flatMap(p =>
p.topicId).contains(topicId)) {
stateChangeLogger.warn(s"Unable to bring up new local leader $tp " +
s"with topic id $topicId because it resides in an offline log " +
"directory.")
@@ -2333,11 +2312,12 @@ class ReplicaManager(val config: KafkaConfig,
s"A topic with the same name but different id exists but it
resides in an offline log " +
s"directory.")
val partition = Partition(new TopicIdPartition(topicId, tp), time,
this)
- allPartitions.put(tp, HostedPartition.Online(partition))
+ allPartitions.put(tp, new HostedPartition.Online(partition))
Some(partition, true)
}
- case HostedPartition.Online(partition) =>
+ case online: HostedPartition.Online[Partition] =>
+ val partition = online.partition
if (partition.topicId.exists(_ != topicId)) {
// Note: Partition#topicId will be None here if the Log object for
this partition
// has not been created.
@@ -2346,7 +2326,7 @@ class ReplicaManager(val config: KafkaConfig,
}
Some(partition, false)
- case HostedPartition.None =>
+ case _: HostedPartition.None[Partition] =>
if (delta.image().topicsById().containsKey(topicId)) {
stateChangeLogger.error(s"Expected partition $tp with topic id " +
s"$topicId to exist, but it was missing. Creating...")
@@ -2356,7 +2336,7 @@ class ReplicaManager(val config: KafkaConfig,
}
// it's a partition that we don't know about yet, so create it and
mark it online
val partition = Partition(new TopicIdPartition(topicId, tp), time,
this)
- allPartitions.put(tp, HostedPartition.Online(partition))
+ allPartitions.put(tp, new HostedPartition.Online(partition))
Some(partition, true)
}
}
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index d868a78e1a7..0b699f9ae08 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -16,6 +16,8 @@
*/
package kafka.server
+import kafka.cluster.Partition
+
import java.net.InetAddress
import java.util
import java.util.concurrent.{CompletableFuture, Executors,
LinkedBlockingQueue, TimeUnit}
@@ -38,6 +40,7 @@ import org.apache.kafka.metadata.{KRaftMetadataCache,
LeaderAndIsr, LeaderRecove
import org.apache.kafka.metadata.PartitionRegistration
import org.apache.kafka.metadata.storage.Formatter
import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig}
+import org.apache.kafka.server.HostedPartition
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion,
TopicIdPartition}
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
FetchPartitionData}
@@ -107,7 +110,7 @@ class ReplicaManagerConcurrencyTest extends Logging {
waitUntilTrue(() => {
replicaManager.getPartition(topicPartition) match {
- case HostedPartition.Online(partition) => partition.isLeader
+ case online: HostedPartition.Online[Partition] =>
online.partition.isLeader
case _ => false
}
}, "Timed out waiting for partition to initialize")
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 48d4918eec0..d504432a7a5 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -65,7 +65,7 @@ import org.apache.kafka.server.log.remote.TopicPartitionLog
import org.apache.kafka.server.log.remote.storage._
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.network.BrokerEndPoint
-import org.apache.kafka.server.PartitionFetchState
+import org.apache.kafka.server.{HostedPartition, PartitionFetchState}
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedRemoteFetch, DelayedRemoteListOffsets}
import org.apache.kafka.server.share.SharePartitionKey
import org.apache.kafka.server.share.fetch.{DelayedShareFetchGroupKey,
DelayedShareFetchKey, ShareFetch}
@@ -1230,7 +1230,7 @@ class ReplicaManagerTest {
val localLog = replicaManager.localLog(topicPartition)
assertTrue(localLog.isDefined, "Log should be created for follower after
applyDelta")
val hostedPartition = replicaManager.getPartition(topicPartition)
- assertTrue(hostedPartition.isInstanceOf[HostedPartition.Online])
+
assertTrue(hostedPartition.isInstanceOf[HostedPartition.Online[Partition]])
// Make local partition a follower - because epoch increased by more
than 1, truncation should
// trigger even though leader does not change
@@ -4029,7 +4029,7 @@ class ReplicaManagerTest {
val leaderImage = imageFromTopics(leaderDelta.apply())
replicaManager.applyDelta(leaderDelta, leaderImage)
-
assertTrue(replicaManager.getPartition(topicPartition).isInstanceOf[HostedPartition.Online])
+
assertTrue(replicaManager.getPartition(topicPartition).isInstanceOf[HostedPartition.Online[Partition]])
assertFalse(replicaManager.localLog(topicPartition).isEmpty)
val id = topicIds(topicPartition.topic)
val log = replicaManager.localLog(topicPartition).get
@@ -4133,12 +4133,12 @@ class ReplicaManagerTest {
val hostedPartition = replicaManager.getPartition(topicPartition)
assertEquals(
- classOf[HostedPartition.Offline],
+ classOf[HostedPartition.Offline[Partition]],
hostedPartition.getClass
)
assertEquals(
topicId,
-
hostedPartition.asInstanceOf[HostedPartition.Offline].partition.flatMap(p =>
p.topicId).get
+
hostedPartition.asInstanceOf[HostedPartition.Offline[Partition]].partition.toScala.flatMap(p
=> p.topicId).get
)
} finally {
replicaManager.shutdown(checkpointHW = false)
@@ -4452,6 +4452,10 @@ class ReplicaManagerTest {
}
}
+ def getOnlinePartition(hostedPartition: HostedPartition[Partition]):
Partition = {
+ hostedPartition.asInstanceOf[HostedPartition.Online[Partition]].partition()
+ }
+
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testDeltaFromLeaderToFollower(enableRemoteStorage: Boolean): Unit = {
@@ -4470,7 +4474,7 @@ class ReplicaManagerTest {
replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
// Check the state of that partition and fetcher
- val HostedPartition.Online(leaderPartition) =
replicaManager.getPartition(topicPartition)
+ val leaderPartition =
getOnlinePartition(replicaManager.getPartition(topicPartition))
assertTrue(leaderPartition.isLeader)
assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
assertEquals(0, leaderPartition.getLeaderEpoch)
@@ -4502,7 +4506,7 @@ class ReplicaManagerTest {
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, followerResponse.get.error)
// Check the state of that partition and fetcher
- val HostedPartition.Online(followerPartition) =
replicaManager.getPartition(topicPartition)
+ val followerPartition =
getOnlinePartition(replicaManager.getPartition(topicPartition))
assertFalse(followerPartition.isLeader)
assertEquals(1, followerPartition.getLeaderEpoch)
@@ -4534,7 +4538,7 @@ class ReplicaManagerTest {
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Check the state of that partition and fetcher
- val HostedPartition.Online(followerPartition) =
replicaManager.getPartition(topicPartition)
+ val followerPartition =
getOnlinePartition(replicaManager.getPartition(topicPartition))
assertFalse(followerPartition.isLeader)
assertEquals(0, followerPartition.getLeaderEpoch)
@@ -4569,7 +4573,7 @@ class ReplicaManagerTest {
)
assertEquals(Errors.NONE, leaderResponse.get.error)
- val HostedPartition.Online(leaderPartition) =
replicaManager.getPartition(topicPartition)
+ val leaderPartition =
getOnlinePartition(replicaManager.getPartition(topicPartition))
assertTrue(leaderPartition.isLeader)
assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
assertEquals(1, leaderPartition.getLeaderEpoch)
@@ -4598,7 +4602,7 @@ class ReplicaManagerTest {
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Check the state of that partition and fetcher
- val HostedPartition.Online(followerPartition) =
replicaManager.getPartition(topicPartition)
+ val followerPartition =
getOnlinePartition(replicaManager.getPartition(topicPartition))
assertFalse(followerPartition.isLeader)
assertEquals(0, followerPartition.getLeaderEpoch)
@@ -4615,7 +4619,7 @@ class ReplicaManagerTest {
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Check that the state stays the same
- val HostedPartition.Online(noChangePartition) =
replicaManager.getPartition(topicPartition)
+ val noChangePartition =
getOnlinePartition(replicaManager.getPartition(topicPartition))
assertFalse(noChangePartition.isLeader)
assertEquals(0, noChangePartition.getLeaderEpoch)
@@ -4645,7 +4649,7 @@ class ReplicaManagerTest {
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Check the state of that partition and fetcher
- val HostedPartition.Online(followerPartition) =
replicaManager.getPartition(topicPartition)
+ val followerPartition =
getOnlinePartition(replicaManager.getPartition(topicPartition))
assertFalse(followerPartition.isLeader)
assertEquals(0, followerPartition.getLeaderEpoch)
@@ -4670,7 +4674,7 @@ class ReplicaManagerTest {
}
// Check that the partition was removed
- assertEquals(HostedPartition.None,
replicaManager.getPartition(topicPartition))
+ assertEquals(new HostedPartition.None[Partition],
replicaManager.getPartition(topicPartition))
assertEquals(None,
replicaManager.replicaFetcherManager.getFetcher(topicPartition))
assertEquals(None, replicaManager.logManager.getLog(topicPartition))
} finally {
@@ -4693,7 +4697,7 @@ class ReplicaManagerTest {
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Check the state of that partition and fetcher
- val HostedPartition.Online(followerPartition) =
replicaManager.getPartition(topicPartition)
+ val followerPartition =
getOnlinePartition(replicaManager.getPartition(topicPartition))
assertFalse(followerPartition.isLeader)
assertEquals(0, followerPartition.getLeaderEpoch)
@@ -4718,7 +4722,7 @@ class ReplicaManagerTest {
}
// Check that the partition was removed
- assertEquals(HostedPartition.None,
replicaManager.getPartition(topicPartition))
+ assertEquals(new HostedPartition.None[Partition],
replicaManager.getPartition(topicPartition))
assertEquals(None,
replicaManager.replicaFetcherManager.getFetcher(topicPartition))
assertEquals(None, replicaManager.logManager.getLog(topicPartition))
} finally {
@@ -4741,7 +4745,7 @@ class ReplicaManagerTest {
replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
// Check the state of that partition and fetcher
- val HostedPartition.Online(leaderPartition) =
replicaManager.getPartition(topicPartition)
+ val leaderPartition =
getOnlinePartition(replicaManager.getPartition(topicPartition))
assertTrue(leaderPartition.isLeader)
assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
assertEquals(0, leaderPartition.getLeaderEpoch)
@@ -4765,7 +4769,7 @@ class ReplicaManagerTest {
}
// Check that the partition was removed
- assertEquals(HostedPartition.None,
replicaManager.getPartition(topicPartition))
+ assertEquals(new HostedPartition.None[Partition],
replicaManager.getPartition(topicPartition))
assertEquals(None,
replicaManager.replicaFetcherManager.getFetcher(topicPartition))
assertEquals(None, replicaManager.logManager.getLog(topicPartition))
} finally {
@@ -4788,7 +4792,7 @@ class ReplicaManagerTest {
replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
// Check the state of that partition and fetcher
- val HostedPartition.Online(leaderPartition) =
replicaManager.getPartition(topicPartition)
+ val leaderPartition =
getOnlinePartition(replicaManager.getPartition(topicPartition))
assertTrue(leaderPartition.isLeader)
assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
assertEquals(0, leaderPartition.getLeaderEpoch)
@@ -4812,7 +4816,7 @@ class ReplicaManagerTest {
}
// Check that the partition was removed
- assertEquals(HostedPartition.None,
replicaManager.getPartition(topicPartition))
+ assertEquals(new HostedPartition.None[Partition],
replicaManager.getPartition(topicPartition))
assertEquals(None,
replicaManager.replicaFetcherManager.getFetcher(topicPartition))
assertEquals(None, replicaManager.logManager.getLog(topicPartition))
} finally {
@@ -4837,7 +4841,7 @@ class ReplicaManagerTest {
replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
// Check the state of that partition and fetcher
- val HostedPartition.Online(leaderPartition) =
replicaManager.getPartition(topicPartition)
+ val leaderPartition =
getOnlinePartition(replicaManager.getPartition(topicPartition))
assertTrue(leaderPartition.isLeader)
assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
assertEquals(0, leaderPartition.getLeaderEpoch)
@@ -4857,7 +4861,7 @@ class ReplicaManagerTest {
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
- val HostedPartition.Online(followerPartition) =
replicaManager.getPartition(topicPartition)
+ val followerPartition =
getOnlinePartition(replicaManager.getPartition(topicPartition))
assertFalse(followerPartition.isLeader)
assertEquals(1, followerPartition.getLeaderEpoch)
@@ -4890,7 +4894,7 @@ class ReplicaManagerTest {
replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
// Check the state of that partition and fetcher
- val HostedPartition.Online(leaderPartition) =
replicaManager.getPartition(topicPartition)
+ val leaderPartition =
getOnlinePartition(replicaManager.getPartition(topicPartition))
assertTrue(leaderPartition.isLeader)
assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
assertEquals(0, leaderPartition.getLeaderEpoch)
@@ -4918,7 +4922,7 @@ class ReplicaManagerTest {
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
- val HostedPartition.Online(followerPartition) =
replicaManager.getPartition(topicPartition)
+ val followerPartition =
getOnlinePartition(replicaManager.getPartition(topicPartition))
assertFalse(followerPartition.isLeader)
assertEquals(1, followerPartition.getLeaderEpoch)
@@ -4959,12 +4963,12 @@ class ReplicaManagerTest {
val hostedPartition = replicaManager.getPartition(topicPartition)
assertEquals(
- classOf[HostedPartition.Offline],
+ classOf[HostedPartition.Offline[Partition]],
hostedPartition.getClass
)
assertEquals(
FOO_UUID,
-
hostedPartition.asInstanceOf[HostedPartition.Offline].partition.flatMap(p =>
p.topicId).get
+
hostedPartition.asInstanceOf[HostedPartition.Offline[Partition]].partition.toScala.flatMap(p
=> p.topicId).get
)
} finally {
replicaManager.shutdown(checkpointHW = false)
@@ -4998,7 +5002,7 @@ class ReplicaManagerTest {
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Check the state of that partition
- val HostedPartition.Online(followerPartition) =
replicaManager.getPartition(topicPartition)
+ val followerPartition =
getOnlinePartition(replicaManager.getPartition(topicPartition))
assertFalse(followerPartition.isLeader)
assertEquals(0, followerPartition.getLeaderEpoch)
assertEquals(0, followerPartition.localLogOrException.logEndOffset)
@@ -5026,7 +5030,8 @@ class ReplicaManagerTest {
Set(topicPartition))
).thenAnswer { _ =>
replicaManager.getPartition(topicPartition) match {
- case HostedPartition.Online(partition) =>
+ case online: HostedPartition.Online[Partition] =>
+ val partition = online.partition()
partition.appendRecordsToFollowerOrFutureReplica(
records = MemoryRecords.withRecords(
Compression.NONE, 0,
@@ -5098,7 +5103,7 @@ class ReplicaManagerTest {
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Check the state of that partition.
- val HostedPartition.Online(followerPartition) =
replicaManager.getPartition(topicPartition)
+ val followerPartition =
getOnlinePartition(replicaManager.getPartition(topicPartition))
assertFalse(followerPartition.isLeader)
assertEquals(0, followerPartition.getLeaderEpoch)
assertEquals(0, followerPartition.getPartitionEpoch)
@@ -5248,17 +5253,17 @@ class ReplicaManagerTest {
replicaManager.applyDelta(topicsDelta, metadataImage)
// Check the state of the partitions.
- val HostedPartition.Online(fooPartition0) =
replicaManager.getPartition(foo0)
+ val fooPartition0 = getOnlinePartition(replicaManager.getPartition(foo0))
assertFalse(fooPartition0.isLeader)
assertEquals(0, fooPartition0.getLeaderEpoch)
assertEquals(0, fooPartition0.getPartitionEpoch)
- val HostedPartition.Online(fooPartition1) =
replicaManager.getPartition(foo1)
+ val fooPartition1 = getOnlinePartition(replicaManager.getPartition(foo1))
assertTrue(fooPartition1.isLeader)
assertEquals(0, fooPartition1.getLeaderEpoch)
assertEquals(0, fooPartition1.getPartitionEpoch)
- val HostedPartition.Online(fooPartition2) =
replicaManager.getPartition(foo2)
+ val fooPartition2 = getOnlinePartition(replicaManager.getPartition(foo2))
assertFalse(fooPartition2.isLeader)
assertEquals(0, fooPartition2.getLeaderEpoch)
assertEquals(0, fooPartition2.getPartitionEpoch)
@@ -5702,7 +5707,7 @@ class ReplicaManagerTest {
replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
// Check the state of that partition and fetcher.
- val HostedPartition.Online(partition) =
replicaManager.getPartition(topicPartition)
+ val partition =
getOnlinePartition(replicaManager.getPartition(topicPartition))
assertTrue(partition.isLeader)
assertEquals(Set(localId, otherId), partition.inSyncReplicaIds)
assertEquals(0, partition.getLeaderEpoch)
@@ -5732,9 +5737,9 @@ class ReplicaManagerTest {
topicIdPartition: TopicIdPartition
): Unit = {
val partition =
replicaManager.getPartition(topicIdPartition.topicPartition())
- assertTrue(partition.isInstanceOf[HostedPartition.Online],
+ assertTrue(partition.isInstanceOf[HostedPartition.Online[Partition]],
s"Expected ${topicIdPartition} to be in state: HostedPartition.Online.
But was in state: ${partition}")
- val hostedPartition = partition.asInstanceOf[HostedPartition.Online]
+ val hostedPartition =
partition.asInstanceOf[HostedPartition.Online[Partition]]
assertTrue(hostedPartition.partition.log.isDefined,
s"Expected ${topicIdPartition} to have a log set in ReplicaManager, but
it did not.")
assertTrue(hostedPartition.partition.log.get.topicId.isPresent,
@@ -5748,7 +5753,7 @@ class ReplicaManagerTest {
topicIdPartition: TopicIdPartition
): Unit = {
val partition =
replicaManager.getPartition(topicIdPartition.topicPartition())
- assertEquals(HostedPartition.None, partition, s"Expected
${topicIdPartition} to be offline, but it was: ${partition}")
+ assertEquals(new HostedPartition.None[Partition], partition, s"Expected
${topicIdPartition} to be offline, but it was: ${partition}")
}
@Test
@@ -5819,7 +5824,7 @@ class ReplicaManagerTest {
replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
// Check the state of that partition and fetcher
- val HostedPartition.Online(leaderPartition) =
replicaManager.getPartition(topicPartition)
+ val leaderPartition =
getOnlinePartition(replicaManager.getPartition(topicPartition))
assertTrue(leaderPartition.isLeader)
assertEquals(0, leaderPartition.getLeaderEpoch)
// On becoming follower listener should not be invoked yet.
@@ -5834,7 +5839,7 @@ class ReplicaManagerTest {
listener.verify(expectedFollower = true)
// Check the state of that partition.
- val HostedPartition.Online(followerPartition) =
replicaManager.getPartition(topicPartition)
+ val followerPartition =
getOnlinePartition(replicaManager.getPartition(topicPartition))
assertFalse(followerPartition.isLeader)
assertEquals(1, followerPartition.getLeaderEpoch)
} finally {
diff --git a/server/src/main/java/org/apache/kafka/server/HostedPartition.java
b/server/src/main/java/org/apache/kafka/server/HostedPartition.java
new file mode 100644
index 00000000000..a50bfb8c61e
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/HostedPartition.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+import java.util.Optional;
+
+/**
+ * Sealed interface to represent the state of hosted partitions. We create a
concrete (active) Partition
+ * instance when the broker receives a LeaderAndIsr request from the
controller or a metadata
+ * log record from the Quorum controller indicating that the broker should be
either a leader
+ * or follower of a partition.
+ */
+public sealed interface HostedPartition<T> {
+ /**
+ * This broker does not have any state for this partition locally.
+ */
+ record None<T>() implements HostedPartition<T> { }
+ /**
+ * This broker hosts the partition and it is online.
+ */
+ record Online<T>(T partition) implements HostedPartition<T> { }
+ /**
+ * This broker hosts the partition, but it is in an offline log directory.
+ */
+ record Offline<T>(Optional<T> partition) implements HostedPartition<T> { }
+}
diff --git a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
index 9c1d2793fd0..9174e990dea 100644
--- a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
+++ b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
@@ -776,9 +776,9 @@ public class KRaftClusterTest {
TopicPartition topicPartition = new TopicPartition("foo",
partitionId);
var partition =
broker.replicaManager().getPartition(topicPartition);
if (isHosted) {
-
assertNotEquals(kafka.server.HostedPartition.None$.MODULE$, partition,
"topicPartition = " + topicPartition);
+ assertNotEquals(new
HostedPartition.None<kafka.cluster.Partition>(), partition, "topicPartition = "
+ topicPartition);
} else {
- assertEquals(kafka.server.HostedPartition.None$.MODULE$,
partition, "topicPartition = " + topicPartition);
+ assertEquals(new
HostedPartition.None<kafka.cluster.Partition>(), partition, "topicPartition = "
+ topicPartition);
}
}
}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
index f0bcfc8a22e..95160f68c89 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
@@ -17,10 +17,10 @@
package org.apache.kafka.tiered.storage.integration;
import kafka.api.TransactionsTest;
-import kafka.server.HostedPartition;
import kafka.server.KafkaBroker;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.server.HostedPartition;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;