This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2cfd28ed8ba KAFKA-19915: [2/3] Move HostedPartition to server module 
(#21475)
2cfd28ed8ba is described below

commit 2cfd28ed8ba894b6f72cbe476987863d6078fc35
Author: Christo Lolov <[email protected]>
AuthorDate: Wed Feb 18 04:55:21 2026 +0000

    KAFKA-19915: [2/3] Move HostedPartition to server module (#21475)
    
    This PR ports `HostedPartition` from a Scala sealed trait in
    `ReplicaManager` to a Java sealed interface in the server module.
    
    Reviewers: Mickael Maison <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 checkstyle/import-control-storage.xml              |   3 +-
 .../main/scala/kafka/server/ReplicaManager.scala   | 114 +++++++++------------
 .../server/ReplicaManagerConcurrencyTest.scala     |   5 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  79 +++++++-------
 .../org/apache/kafka/server/HostedPartition.java   |  41 ++++++++
 .../org/apache/kafka/server/KRaftClusterTest.java  |   4 +-
 .../TransactionsWithTieredStoreTest.java           |   2 +-
 7 files changed, 138 insertions(+), 110 deletions(-)

diff --git a/checkstyle/import-control-storage.xml 
b/checkstyle/import-control-storage.xml
index fb511c4fb79..44f25daf5bc 100644
--- a/checkstyle/import-control-storage.xml
+++ b/checkstyle/import-control-storage.xml
@@ -129,8 +129,7 @@
         <allow pkg="org.apache.kafka.metadata" />
         <allow pkg="org.apache.kafka.storage"/>
 
-        <allow pkg="org.apache.kafka.server.log" />
-        <allow pkg="org.apache.kafka.server.config" />
+        <allow pkg="org.apache.kafka.server" />
 
         <allow pkg="org.apache.kafka.test" />
     </subpackage>
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 6e433ce1526..67dc3dc9463 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -19,7 +19,6 @@ package kafka.server
 import com.yammer.metrics.core.Meter
 import kafka.cluster.Partition
 import kafka.log.LogManager
-import kafka.server.HostedPartition.Online
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, 
FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, 
IsrShrinksPerSecMetricName, LeaderCountMetricName, 
OfflineReplicaCountMetricName, PartitionCountMetricName, 
PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, 
ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, 
UnderReplicatedPartitionsMetricName, createLogReadResult, 
isListOffsetsTimestampUnsupported}
 import kafka.server.share.DelayedShareFetch
@@ -64,7 +63,7 @@ import 
org.apache.kafka.server.transaction.AddPartitionsToTxnManager
 import 
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation
 import org.apache.kafka.server.util.timer.{SystemTimer, TimerTask}
 import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
-import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common}
+import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, 
HostedPartition, common}
 import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, 
OffsetCheckpointFile, OffsetCheckpoints}
 import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
FetchPartitionStatus, LeaderHwChange, LogAppendInfo, LogConfig, 
LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogReadResult, 
OffsetResultHolder, RecordValidationException, RecordValidationStats, 
RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -107,31 +106,6 @@ case class LogDeleteRecordsResult(requestedOffset: Long, 
lowWatermark: Long, exc
   }
 }
 
-/**
- * Trait to represent the state of hosted partitions. We create a concrete 
(active) Partition
- * instance when the broker receives a LeaderAndIsr request from the 
controller or a metadata
- * log record from the Quorum controller indicating that the broker should be 
either a leader
- * or follower of a partition.
- */
-sealed trait HostedPartition
-
-object HostedPartition {
-  /**
-   * This broker does not have any state for this partition locally.
-   */
-  final object None extends HostedPartition
-
-  /**
-   * This broker hosts the partition and it is online.
-   */
-  final case class Online(partition: Partition) extends HostedPartition
-
-  /**
-   * This broker hosts the partition, but it is in an offline log directory.
-   */
-  final case class Offline(partition: Option[Partition]) extends 
HostedPartition
-}
-
 object ReplicaManager {
   val HighWatermarkFilename = "replication-offset-checkpoint"
 
@@ -256,7 +230,7 @@ class ReplicaManager(val config: KafkaConfig,
 
   /* epoch of the controller that last changed the leader */
   protected val localBrokerId = config.brokerId
-  protected val allPartitions = new ConcurrentHashMap[TopicPartition, 
HostedPartition]
+  protected val allPartitions = new ConcurrentHashMap[TopicPartition, 
HostedPartition[Partition]]
   private val replicaStateChangeLock = new Object
   val replicaFetcherManager = createReplicaFetcherManager(metrics, time, 
quotaManagers.follower)
   private[server] val replicaAlterLogDirsManager = 
createReplicaAlterLogDirsManager(quotaManagers.alterLogDirs, brokerTopicStats)
@@ -340,8 +314,9 @@ class ReplicaManager(val config: KafkaConfig,
 
   private def maybeRemoveTopicMetrics(topic: String): Unit = {
     val topicHasNonOfflinePartition = allPartitions.values.asScala.exists {
-      case online: HostedPartition.Online => topic == online.partition.topic
-      case HostedPartition.None | HostedPartition.Offline(_) => false
+      case online: HostedPartition.Online[Partition] => topic == 
online.partition.topic
+      case _: HostedPartition.Offline[Partition] => false
+      case _: HostedPartition.None[Partition] => false
     }
     if (!topicHasNonOfflinePartition) // nothing online or deferred
       brokerTopicStats.removeMetrics(topic)
@@ -400,8 +375,8 @@ class ReplicaManager(val config: KafkaConfig,
    */
   def maybeAddListener(partition: TopicPartition, listener: 
PartitionListener): Boolean = {
     getPartition(partition) match {
-      case HostedPartition.Online(partition) =>
-        partition.maybeAddListener(listener)
+      case online: HostedPartition.Online[Partition] =>
+        online.partition.maybeAddListener(listener)
       case _ =>
         false
     }
@@ -412,8 +387,8 @@ class ReplicaManager(val config: KafkaConfig,
    */
   def removeListener(partition: TopicPartition, listener: PartitionListener): 
Unit = {
     getPartition(partition) match {
-      case HostedPartition.Online(partition) =>
-        partition.removeListener(listener)
+      case online: HostedPartition.Online[Partition] =>
+        online.partition.removeListener(listener)
       case _ => // Ignore
     }
   }
@@ -441,7 +416,7 @@ class ReplicaManager(val config: KafkaConfig,
       var topicId: Option[Uuid] = None
       if (stopPartition.deleteLocalLog) {
         getPartition(topicPartition) match {
-          case hostedPartition: HostedPartition.Online =>
+          case hostedPartition: HostedPartition.Online[Partition] =>
             if (allPartitions.remove(topicPartition, hostedPartition)) {
               maybeRemoveTopicMetrics(topicPartition.topic)
               // Logs are not deleted here. They are deleted in a single batch 
later on.
@@ -483,13 +458,13 @@ class ReplicaManager(val config: KafkaConfig,
     new TopicIdPartition(topicId, topicPartition)
   }
 
-  def getPartition(topicPartition: TopicPartition): HostedPartition = {
-    Option(allPartitions.get(topicPartition)).getOrElse(HostedPartition.None)
+  def getPartition(topicPartition: TopicPartition): HostedPartition[Partition] 
= {
+    Option(allPartitions.get(topicPartition)).getOrElse(new 
HostedPartition.None[Partition])
   }
 
   def isAddingReplica(topicPartition: TopicPartition, replicaId: Int): Boolean 
= {
     getPartition(topicPartition) match {
-      case Online(partition) => partition.isAddingReplica(replicaId)
+      case online: HostedPartition.Online[Partition] => 
online.partition.isAddingReplica(replicaId)
       case _ => false
     }
   }
@@ -503,12 +478,12 @@ class ReplicaManager(val config: KafkaConfig,
 
   // Visible for testing
   private[server] def addOnlinePartition(topicPartition: TopicPartition, 
partition: Partition): Unit = {
-    allPartitions.put(topicPartition, HostedPartition.Online(partition))
+    allPartitions.put(topicPartition, new HostedPartition.Online(partition))
   }
 
   def onlinePartition(topicPartition: TopicPartition): Option[Partition] = {
     getPartition(topicPartition) match {
-      case HostedPartition.Online(partition) => Some(partition)
+      case online: HostedPartition.Online[Partition] => Some(online.partition)
       case _ => None
     }
   }
@@ -517,13 +492,13 @@ class ReplicaManager(val config: KafkaConfig,
   // the iterator has been constructed could still be returned by this 
iterator.
   private def onlinePartitionsIterator: Iterator[Partition] = {
     allPartitions.values.asScala.iterator.flatMap {
-      case HostedPartition.Online(partition) => Some(partition)
+      case online: HostedPartition.Online[Partition] => Some(online.partition)
       case _ => None
     }
   }
 
   private def offlinePartitionCount: Int = {
-    allPartitions.values.asScala.iterator.count(_.getClass == 
HostedPartition.Offline.getClass)
+    allPartitions.values.asScala.iterator.count(_.getClass == 
classOf[HostedPartition.Offline[Partition]])
   }
 
   def getPartitionOrException(topicPartition: TopicPartition): Partition = {
@@ -561,20 +536,20 @@ class ReplicaManager(val config: KafkaConfig,
 
   def getPartitionOrError(topicPartition: TopicPartition): Either[Errors, 
Partition] = {
     getPartition(topicPartition) match {
-      case HostedPartition.Online(partition) =>
-        Right(partition)
+      case online: HostedPartition.Online[Partition] =>
+        Right(online.partition)
 
-      case HostedPartition.Offline(_) =>
+      case _: HostedPartition.Offline[Partition] =>
         Left(Errors.KAFKA_STORAGE_ERROR)
 
-      case HostedPartition.None if metadataCache.contains(topicPartition) =>
+      case _: HostedPartition.None[Partition] if 
metadataCache.contains(topicPartition) =>
         // The topic exists, but this broker is no longer a replica of it, so 
we return NOT_LEADER_OR_FOLLOWER which
         // forces clients to refresh metadata to find the new location. This 
can happen, for example,
         // during a partition reassignment if a produce request from the 
client is sent to a broker after
         // the local replica has been deleted.
         Left(Errors.NOT_LEADER_OR_FOLLOWER)
 
-      case HostedPartition.None =>
+      case _: HostedPartition.None[Partition] =>
         Left(Errors.UNKNOWN_TOPIC_OR_PARTITION)
     }
   }
@@ -1162,7 +1137,8 @@ class ReplicaManager(val config: KafkaConfig,
             throw new KafkaStorageException(s"Log directory $destinationDir is 
offline")
 
           getPartition(topicPartition) match {
-            case HostedPartition.Online(partition) =>
+            case online: HostedPartition.Online[Partition] =>
+              val partition = online.partition
               // Stop current replica movement if the destinationDir is 
different from the existing destination log directory
               if (partition.futureReplicaDirChanged(destinationDir)) {
                 
replicaAlterLogDirsManager.removeFetcherForPartitions(Set(topicPartition))
@@ -1176,10 +1152,10 @@ class ReplicaManager(val config: KafkaConfig,
                   logManager.resumeCleaning(topicPartition)
                 }
               }
-            case HostedPartition.Offline(_) =>
+            case _: HostedPartition.Offline[Partition] =>
               throw new KafkaStorageException(s"Partition $topicPartition is 
offline")
 
-            case HostedPartition.None => // Do nothing
+            case _: HostedPartition.None[Partition] => // Do nothing
           }
 
           // If the log for this partition has not been created yet:
@@ -1331,7 +1307,8 @@ class ReplicaManager(val config: KafkaConfig,
     if (delayedDeleteRecordsRequired(localDeleteRecordsResults)) {
       def onAcks(topicPartition: TopicPartition, status: 
DeleteRecordsPartitionStatus): Unit = {
         val (lowWatermarkReached, error, lw) = getPartition(topicPartition) 
match {
-          case HostedPartition.Online(partition) =>
+          case online: HostedPartition.Online[Partition] =>
+            val partition = online.partition
             partition.leaderLogIfLocal match {
               case Some(_) =>
                 val leaderLW = partition.lowWatermarkIfLeader
@@ -1340,10 +1317,10 @@ class ReplicaManager(val config: KafkaConfig,
                 (false, Errors.NOT_LEADER_OR_FOLLOWER, 
DeleteRecordsResponse.INVALID_LOW_WATERMARK)
             }
 
-          case HostedPartition.Offline(_) =>
+          case _: HostedPartition.Offline[Partition] =>
             (false, Errors.KAFKA_STORAGE_ERROR, 
DeleteRecordsResponse.INVALID_LOW_WATERMARK)
 
-          case HostedPartition.None =>
+          case _: HostedPartition.None[Partition] =>
             (false, Errors.UNKNOWN_TOPIC_OR_PARTITION, 
DeleteRecordsResponse.INVALID_LOW_WATERMARK)
         }
         if (error != Errors.NONE || lowWatermarkReached) {
@@ -2152,11 +2129,12 @@ class ReplicaManager(val config: KafkaConfig,
 
   def markPartitionOffline(tp: TopicPartition): Unit = replicaStateChangeLock 
synchronized {
     allPartitions.get(tp) match {
-      case HostedPartition.Online(partition) =>
-        allPartitions.put(tp, HostedPartition.Offline(Some(partition)))
+      case online: HostedPartition.Online[Partition] =>
+        val partition = online.partition
+        allPartitions.put(tp, new 
HostedPartition.Offline(Optional.of(partition)))
         partition.markOffline()
       case _ =>
-        allPartitions.put(tp, HostedPartition.Offline(None))
+        allPartitions.put(tp, new HostedPartition.Offline(Optional.empty()))
     }
   }
 
@@ -2274,29 +2252,29 @@ class ReplicaManager(val config: KafkaConfig,
       val partitions = offsetForLeaderTopic.partitions.asScala.map { 
offsetForLeaderPartition =>
         val tp = new TopicPartition(offsetForLeaderTopic.topic, 
offsetForLeaderPartition.partition)
         getPartition(tp) match {
-          case HostedPartition.Online(partition) =>
+          case online: HostedPartition.Online[Partition] =>
             val currentLeaderEpochOpt =
               if (offsetForLeaderPartition.currentLeaderEpoch == 
RecordBatch.NO_PARTITION_LEADER_EPOCH)
                 Optional.empty[Integer]
               else
                 
Optional.of[Integer](offsetForLeaderPartition.currentLeaderEpoch)
 
-            partition.lastOffsetForLeaderEpoch(
+            online.partition.lastOffsetForLeaderEpoch(
               currentLeaderEpochOpt,
               offsetForLeaderPartition.leaderEpoch,
               fetchOnlyFromLeader = true)
 
-          case HostedPartition.Offline(_) =>
+          case _: HostedPartition.Offline[Partition] =>
             new EpochEndOffset()
               .setPartition(offsetForLeaderPartition.partition)
               .setErrorCode(Errors.KAFKA_STORAGE_ERROR.code)
 
-          case HostedPartition.None if metadataCache.contains(tp) =>
+          case _: HostedPartition.None[Partition] if 
metadataCache.contains(tp) =>
             new EpochEndOffset()
               .setPartition(offsetForLeaderPartition.partition)
               .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code)
 
-          case HostedPartition.None =>
+          case _: HostedPartition.None[Partition] =>
             new EpochEndOffset()
               .setPartition(offsetForLeaderPartition.partition)
               .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
@@ -2322,8 +2300,9 @@ class ReplicaManager(val config: KafkaConfig,
                                           delta: TopicsDelta,
                                           topicId: Uuid): Option[(Partition, 
Boolean)] = {
     getPartition(tp) match {
-      case HostedPartition.Offline(offlinePartition) =>
-        if (offlinePartition.flatMap(p => p.topicId).contains(topicId)) {
+      case offline: HostedPartition.Offline[Partition] =>
+        val offlinePartition = offline.partition
+        if (offlinePartition.toScala.flatMap(p => 
p.topicId).contains(topicId)) {
           stateChangeLogger.warn(s"Unable to bring up new local leader $tp " +
             s"with topic id $topicId because it resides in an offline log " +
             "directory.")
@@ -2333,11 +2312,12 @@ class ReplicaManager(val config: KafkaConfig,
             s"A topic with the same name but different id exists but it 
resides in an offline log " +
             s"directory.")
           val partition = Partition(new TopicIdPartition(topicId, tp), time, 
this)
-          allPartitions.put(tp, HostedPartition.Online(partition))
+          allPartitions.put(tp, new HostedPartition.Online(partition))
           Some(partition, true)
         }
 
-      case HostedPartition.Online(partition) =>
+      case online: HostedPartition.Online[Partition] =>
+        val partition = online.partition
         if (partition.topicId.exists(_ != topicId)) {
           // Note: Partition#topicId will be None here if the Log object for 
this partition
           // has not been created.
@@ -2346,7 +2326,7 @@ class ReplicaManager(val config: KafkaConfig,
         }
         Some(partition, false)
 
-      case HostedPartition.None =>
+      case _: HostedPartition.None[Partition] =>
         if (delta.image().topicsById().containsKey(topicId)) {
           stateChangeLogger.error(s"Expected partition $tp with topic id " +
             s"$topicId to exist, but it was missing. Creating...")
@@ -2356,7 +2336,7 @@ class ReplicaManager(val config: KafkaConfig,
         }
         // it's a partition that we don't know about yet, so create it and 
mark it online
         val partition = Partition(new TopicIdPartition(topicId, tp), time, 
this)
-        allPartitions.put(tp, HostedPartition.Online(partition))
+        allPartitions.put(tp, new HostedPartition.Online(partition))
         Some(partition, true)
     }
   }
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index d868a78e1a7..0b699f9ae08 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -16,6 +16,8 @@
  */
 package kafka.server
 
+import kafka.cluster.Partition
+
 import java.net.InetAddress
 import java.util
 import java.util.concurrent.{CompletableFuture, Executors, 
LinkedBlockingQueue, TimeUnit}
@@ -38,6 +40,7 @@ import org.apache.kafka.metadata.{KRaftMetadataCache, 
LeaderAndIsr, LeaderRecove
 import org.apache.kafka.metadata.PartitionRegistration
 import org.apache.kafka.metadata.storage.Formatter
 import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig}
+import org.apache.kafka.server.HostedPartition
 import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, 
TopicIdPartition}
 import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
FetchPartitionData}
@@ -107,7 +110,7 @@ class ReplicaManagerConcurrencyTest extends Logging {
 
     waitUntilTrue(() => {
       replicaManager.getPartition(topicPartition) match {
-        case HostedPartition.Online(partition) => partition.isLeader
+        case online: HostedPartition.Online[Partition] => 
online.partition.isLeader
         case _ => false
       }
     }, "Timed out waiting for partition to initialize")
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 48d4918eec0..d504432a7a5 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -65,7 +65,7 @@ import org.apache.kafka.server.log.remote.TopicPartitionLog
 import org.apache.kafka.server.log.remote.storage._
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
 import org.apache.kafka.server.network.BrokerEndPoint
-import org.apache.kafka.server.PartitionFetchState
+import org.apache.kafka.server.{HostedPartition, PartitionFetchState}
 import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedRemoteFetch, DelayedRemoteListOffsets}
 import org.apache.kafka.server.share.SharePartitionKey
 import org.apache.kafka.server.share.fetch.{DelayedShareFetchGroupKey, 
DelayedShareFetchKey, ShareFetch}
@@ -1230,7 +1230,7 @@ class ReplicaManagerTest {
       val localLog = replicaManager.localLog(topicPartition)
       assertTrue(localLog.isDefined, "Log should be created for follower after 
applyDelta")
       val hostedPartition = replicaManager.getPartition(topicPartition)
-      assertTrue(hostedPartition.isInstanceOf[HostedPartition.Online])
+      
assertTrue(hostedPartition.isInstanceOf[HostedPartition.Online[Partition]])
 
       // Make local partition a follower - because epoch increased by more 
than 1, truncation should
       // trigger even though leader does not change
@@ -4029,7 +4029,7 @@ class ReplicaManagerTest {
       val leaderImage = imageFromTopics(leaderDelta.apply())
       replicaManager.applyDelta(leaderDelta, leaderImage)
 
-      
assertTrue(replicaManager.getPartition(topicPartition).isInstanceOf[HostedPartition.Online])
+      
assertTrue(replicaManager.getPartition(topicPartition).isInstanceOf[HostedPartition.Online[Partition]])
       assertFalse(replicaManager.localLog(topicPartition).isEmpty)
       val id = topicIds(topicPartition.topic)
       val log = replicaManager.localLog(topicPartition).get
@@ -4133,12 +4133,12 @@ class ReplicaManagerTest {
 
       val hostedPartition = replicaManager.getPartition(topicPartition)
       assertEquals(
-        classOf[HostedPartition.Offline],
+        classOf[HostedPartition.Offline[Partition]],
         hostedPartition.getClass
       )
       assertEquals(
         topicId,
-        
hostedPartition.asInstanceOf[HostedPartition.Offline].partition.flatMap(p => 
p.topicId).get
+        
hostedPartition.asInstanceOf[HostedPartition.Offline[Partition]].partition.toScala.flatMap(p
 => p.topicId).get
       )
     } finally {
       replicaManager.shutdown(checkpointHW = false)
@@ -4452,6 +4452,10 @@ class ReplicaManagerTest {
     }
   }
 
+  def getOnlinePartition(hostedPartition: HostedPartition[Partition]): 
Partition = {
+    hostedPartition.asInstanceOf[HostedPartition.Online[Partition]].partition()
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = Array(true, false))
   def testDeltaFromLeaderToFollower(enableRemoteStorage: Boolean): Unit = {
@@ -4470,7 +4474,7 @@ class ReplicaManagerTest {
       replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
 
       // Check the state of that partition and fetcher
-      val HostedPartition.Online(leaderPartition) = 
replicaManager.getPartition(topicPartition)
+      val leaderPartition = 
getOnlinePartition(replicaManager.getPartition(topicPartition))
       assertTrue(leaderPartition.isLeader)
       assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
       assertEquals(0, leaderPartition.getLeaderEpoch)
@@ -4502,7 +4506,7 @@ class ReplicaManagerTest {
       assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, followerResponse.get.error)
 
       // Check the state of that partition and fetcher
-      val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+      val followerPartition = 
getOnlinePartition(replicaManager.getPartition(topicPartition))
       assertFalse(followerPartition.isLeader)
       assertEquals(1, followerPartition.getLeaderEpoch)
 
@@ -4534,7 +4538,7 @@ class ReplicaManagerTest {
       replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
 
       // Check the state of that partition and fetcher
-      val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+      val followerPartition = 
getOnlinePartition(replicaManager.getPartition(topicPartition))
       assertFalse(followerPartition.isLeader)
       assertEquals(0, followerPartition.getLeaderEpoch)
 
@@ -4569,7 +4573,7 @@ class ReplicaManagerTest {
       )
       assertEquals(Errors.NONE, leaderResponse.get.error)
 
-      val HostedPartition.Online(leaderPartition) = 
replicaManager.getPartition(topicPartition)
+      val leaderPartition = 
getOnlinePartition(replicaManager.getPartition(topicPartition))
       assertTrue(leaderPartition.isLeader)
       assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
       assertEquals(1, leaderPartition.getLeaderEpoch)
@@ -4598,7 +4602,7 @@ class ReplicaManagerTest {
       replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
 
       // Check the state of that partition and fetcher
-      val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+      val followerPartition = 
getOnlinePartition(replicaManager.getPartition(topicPartition))
       assertFalse(followerPartition.isLeader)
       assertEquals(0, followerPartition.getLeaderEpoch)
 
@@ -4615,7 +4619,7 @@ class ReplicaManagerTest {
       replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
 
       // Check that the state stays the same
-      val HostedPartition.Online(noChangePartition) = 
replicaManager.getPartition(topicPartition)
+      val noChangePartition = 
getOnlinePartition(replicaManager.getPartition(topicPartition))
       assertFalse(noChangePartition.isLeader)
       assertEquals(0, noChangePartition.getLeaderEpoch)
 
@@ -4645,7 +4649,7 @@ class ReplicaManagerTest {
       replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
 
       // Check the state of that partition and fetcher
-      val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+      val followerPartition = 
getOnlinePartition(replicaManager.getPartition(topicPartition))
       assertFalse(followerPartition.isLeader)
       assertEquals(0, followerPartition.getLeaderEpoch)
 
@@ -4670,7 +4674,7 @@ class ReplicaManagerTest {
       }
 
       // Check that the partition was removed
-      assertEquals(HostedPartition.None, 
replicaManager.getPartition(topicPartition))
+      assertEquals(new HostedPartition.None[Partition], 
replicaManager.getPartition(topicPartition))
       assertEquals(None, 
replicaManager.replicaFetcherManager.getFetcher(topicPartition))
       assertEquals(None, replicaManager.logManager.getLog(topicPartition))
     } finally {
@@ -4693,7 +4697,7 @@ class ReplicaManagerTest {
       replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
 
       // Check the state of that partition and fetcher
-      val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+      val followerPartition = 
getOnlinePartition(replicaManager.getPartition(topicPartition))
       assertFalse(followerPartition.isLeader)
       assertEquals(0, followerPartition.getLeaderEpoch)
 
@@ -4718,7 +4722,7 @@ class ReplicaManagerTest {
       }
 
       // Check that the partition was removed
-      assertEquals(HostedPartition.None, 
replicaManager.getPartition(topicPartition))
+      assertEquals(new HostedPartition.None[Partition], 
replicaManager.getPartition(topicPartition))
       assertEquals(None, 
replicaManager.replicaFetcherManager.getFetcher(topicPartition))
       assertEquals(None, replicaManager.logManager.getLog(topicPartition))
     } finally {
@@ -4741,7 +4745,7 @@ class ReplicaManagerTest {
       replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
 
       // Check the state of that partition and fetcher
-      val HostedPartition.Online(leaderPartition) = 
replicaManager.getPartition(topicPartition)
+      val leaderPartition = 
getOnlinePartition(replicaManager.getPartition(topicPartition))
       assertTrue(leaderPartition.isLeader)
       assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
       assertEquals(0, leaderPartition.getLeaderEpoch)
@@ -4765,7 +4769,7 @@ class ReplicaManagerTest {
       }
 
       // Check that the partition was removed
-      assertEquals(HostedPartition.None, 
replicaManager.getPartition(topicPartition))
+      assertEquals(new HostedPartition.None[Partition], 
replicaManager.getPartition(topicPartition))
       assertEquals(None, 
replicaManager.replicaFetcherManager.getFetcher(topicPartition))
       assertEquals(None, replicaManager.logManager.getLog(topicPartition))
     } finally {
@@ -4788,7 +4792,7 @@ class ReplicaManagerTest {
       replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
 
       // Check the state of that partition and fetcher
-      val HostedPartition.Online(leaderPartition) = 
replicaManager.getPartition(topicPartition)
+      val leaderPartition = 
getOnlinePartition(replicaManager.getPartition(topicPartition))
       assertTrue(leaderPartition.isLeader)
       assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
       assertEquals(0, leaderPartition.getLeaderEpoch)
@@ -4812,7 +4816,7 @@ class ReplicaManagerTest {
       }
 
       // Check that the partition was removed
-      assertEquals(HostedPartition.None, 
replicaManager.getPartition(topicPartition))
+      assertEquals(new HostedPartition.None[Partition], 
replicaManager.getPartition(topicPartition))
       assertEquals(None, 
replicaManager.replicaFetcherManager.getFetcher(topicPartition))
       assertEquals(None, replicaManager.logManager.getLog(topicPartition))
     } finally {
@@ -4837,7 +4841,7 @@ class ReplicaManagerTest {
       replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
 
       // Check the state of that partition and fetcher
-      val HostedPartition.Online(leaderPartition) = 
replicaManager.getPartition(topicPartition)
+      val leaderPartition = 
getOnlinePartition(replicaManager.getPartition(topicPartition))
       assertTrue(leaderPartition.isLeader)
       assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
       assertEquals(0, leaderPartition.getLeaderEpoch)
@@ -4857,7 +4861,7 @@ class ReplicaManagerTest {
       val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
       replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
 
-      val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+      val followerPartition = 
getOnlinePartition(replicaManager.getPartition(topicPartition))
       assertFalse(followerPartition.isLeader)
       assertEquals(1, followerPartition.getLeaderEpoch)
 
@@ -4890,7 +4894,7 @@ class ReplicaManagerTest {
       replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
 
       // Check the state of that partition and fetcher
-      val HostedPartition.Online(leaderPartition) = 
replicaManager.getPartition(topicPartition)
+      val leaderPartition = 
getOnlinePartition(replicaManager.getPartition(topicPartition))
       assertTrue(leaderPartition.isLeader)
       assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
       assertEquals(0, leaderPartition.getLeaderEpoch)
@@ -4918,7 +4922,7 @@ class ReplicaManagerTest {
       val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
       replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
 
-      val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+      val followerPartition = 
getOnlinePartition(replicaManager.getPartition(topicPartition))
       assertFalse(followerPartition.isLeader)
       assertEquals(1, followerPartition.getLeaderEpoch)
 
@@ -4959,12 +4963,12 @@ class ReplicaManagerTest {
 
       val hostedPartition = replicaManager.getPartition(topicPartition)
       assertEquals(
-        classOf[HostedPartition.Offline],
+        classOf[HostedPartition.Offline[Partition]],
         hostedPartition.getClass
       )
       assertEquals(
         FOO_UUID,
-        
hostedPartition.asInstanceOf[HostedPartition.Offline].partition.flatMap(p => 
p.topicId).get
+        
hostedPartition.asInstanceOf[HostedPartition.Offline[Partition]].partition.toScala.flatMap(p
 => p.topicId).get
       )
     } finally {
       replicaManager.shutdown(checkpointHW = false)
@@ -4998,7 +5002,7 @@ class ReplicaManagerTest {
       replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
 
       // Check the state of that partition
-      val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+      val followerPartition = 
getOnlinePartition(replicaManager.getPartition(topicPartition))
       assertFalse(followerPartition.isLeader)
       assertEquals(0, followerPartition.getLeaderEpoch)
       assertEquals(0, followerPartition.localLogOrException.logEndOffset)
@@ -5026,7 +5030,8 @@ class ReplicaManagerTest {
         Set(topicPartition))
       ).thenAnswer { _ =>
         replicaManager.getPartition(topicPartition) match {
-          case HostedPartition.Online(partition) =>
+          case online: HostedPartition.Online[Partition] =>
+            val partition = online.partition()
             partition.appendRecordsToFollowerOrFutureReplica(
               records = MemoryRecords.withRecords(
                 Compression.NONE, 0,
@@ -5098,7 +5103,7 @@ class ReplicaManagerTest {
       replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
 
       // Check the state of that partition.
-      val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+      val followerPartition = 
getOnlinePartition(replicaManager.getPartition(topicPartition))
       assertFalse(followerPartition.isLeader)
       assertEquals(0, followerPartition.getLeaderEpoch)
       assertEquals(0, followerPartition.getPartitionEpoch)
@@ -5248,17 +5253,17 @@ class ReplicaManagerTest {
       replicaManager.applyDelta(topicsDelta, metadataImage)
 
       // Check the state of the partitions.
-      val HostedPartition.Online(fooPartition0) = 
replicaManager.getPartition(foo0)
+      val fooPartition0 = getOnlinePartition(replicaManager.getPartition(foo0))
       assertFalse(fooPartition0.isLeader)
       assertEquals(0, fooPartition0.getLeaderEpoch)
       assertEquals(0, fooPartition0.getPartitionEpoch)
 
-      val HostedPartition.Online(fooPartition1) = 
replicaManager.getPartition(foo1)
+      val fooPartition1 = getOnlinePartition(replicaManager.getPartition(foo1))
       assertTrue(fooPartition1.isLeader)
       assertEquals(0, fooPartition1.getLeaderEpoch)
       assertEquals(0, fooPartition1.getPartitionEpoch)
 
-      val HostedPartition.Online(fooPartition2) = 
replicaManager.getPartition(foo2)
+      val fooPartition2 = getOnlinePartition(replicaManager.getPartition(foo2))
       assertFalse(fooPartition2.isLeader)
       assertEquals(0, fooPartition2.getLeaderEpoch)
       assertEquals(0, fooPartition2.getPartitionEpoch)
@@ -5702,7 +5707,7 @@ class ReplicaManagerTest {
       replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
 
       // Check the state of that partition and fetcher.
-      val HostedPartition.Online(partition) = 
replicaManager.getPartition(topicPartition)
+      val partition = 
getOnlinePartition(replicaManager.getPartition(topicPartition))
       assertTrue(partition.isLeader)
       assertEquals(Set(localId, otherId), partition.inSyncReplicaIds)
       assertEquals(0, partition.getLeaderEpoch)
@@ -5732,9 +5737,9 @@ class ReplicaManagerTest {
     topicIdPartition: TopicIdPartition
   ): Unit = {
     val partition = 
replicaManager.getPartition(topicIdPartition.topicPartition())
-    assertTrue(partition.isInstanceOf[HostedPartition.Online],
+    assertTrue(partition.isInstanceOf[HostedPartition.Online[Partition]],
       s"Expected ${topicIdPartition} to be in state: HostedPartition.Online. 
But was in state: ${partition}")
-    val hostedPartition = partition.asInstanceOf[HostedPartition.Online]
+    val hostedPartition = 
partition.asInstanceOf[HostedPartition.Online[Partition]]
     assertTrue(hostedPartition.partition.log.isDefined,
       s"Expected ${topicIdPartition} to have a log set in ReplicaManager, but 
it did not.")
     assertTrue(hostedPartition.partition.log.get.topicId.isPresent,
@@ -5748,7 +5753,7 @@ class ReplicaManagerTest {
     topicIdPartition: TopicIdPartition
   ): Unit = {
     val partition = 
replicaManager.getPartition(topicIdPartition.topicPartition())
-    assertEquals(HostedPartition.None, partition, s"Expected 
${topicIdPartition} to be offline, but it was: ${partition}")
+    assertEquals(new HostedPartition.None[Partition], partition, s"Expected 
${topicIdPartition} to be offline, but it was: ${partition}")
   }
 
   @Test
@@ -5819,7 +5824,7 @@ class ReplicaManagerTest {
       replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
 
       // Check the state of that partition and fetcher
-      val HostedPartition.Online(leaderPartition) = 
replicaManager.getPartition(topicPartition)
+      val leaderPartition = 
getOnlinePartition(replicaManager.getPartition(topicPartition))
       assertTrue(leaderPartition.isLeader)
       assertEquals(0, leaderPartition.getLeaderEpoch)
       // On becoming follower listener should not be invoked yet.
@@ -5834,7 +5839,7 @@ class ReplicaManagerTest {
       listener.verify(expectedFollower = true)
 
       // Check the state of that partition.
-      val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+      val followerPartition = 
getOnlinePartition(replicaManager.getPartition(topicPartition))
       assertFalse(followerPartition.isLeader)
       assertEquals(1, followerPartition.getLeaderEpoch)
     } finally {
diff --git a/server/src/main/java/org/apache/kafka/server/HostedPartition.java 
b/server/src/main/java/org/apache/kafka/server/HostedPartition.java
new file mode 100644
index 00000000000..a50bfb8c61e
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/HostedPartition.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+import java.util.Optional;
+
+/**
+ * Sealed interface to represent the state of hosted partitions. We create a 
concrete (active) Partition
+ * instance when the broker receives a LeaderAndIsr request from the 
controller or a metadata
+ * log record from the Quorum controller indicating that the broker should be 
either a leader
+ * or follower of a partition.
+ */
+public sealed interface HostedPartition<T> {
+    /**
+     * This broker does not have any state for this partition locally.
+     */
+    record None<T>() implements HostedPartition<T> { }
+    /**
+     * This broker hosts the partition and it is online.
+     */
+    record Online<T>(T partition) implements HostedPartition<T> { }
+    /**
+     * This broker hosts the partition, but it is in an offline log directory.
+     */
+    record Offline<T>(Optional<T> partition) implements HostedPartition<T> { }
+}
diff --git a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java 
b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
index 9c1d2793fd0..9174e990dea 100644
--- a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
+++ b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
@@ -776,9 +776,9 @@ public class KRaftClusterTest {
                 TopicPartition topicPartition = new TopicPartition("foo", 
partitionId);
                 var partition = 
broker.replicaManager().getPartition(topicPartition);
                 if (isHosted) {
-                    
assertNotEquals(kafka.server.HostedPartition.None$.MODULE$, partition, 
"topicPartition = " + topicPartition);
+                    assertNotEquals(new 
HostedPartition.None<kafka.cluster.Partition>(), partition, "topicPartition = " 
+ topicPartition);
                 } else {
-                    assertEquals(kafka.server.HostedPartition.None$.MODULE$, 
partition, "topicPartition = " + topicPartition);
+                    assertEquals(new 
HostedPartition.None<kafka.cluster.Partition>(), partition, "topicPartition = " 
+ topicPartition);
                 }
             }
         }
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
index f0bcfc8a22e..95160f68c89 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
@@ -17,10 +17,10 @@
 package org.apache.kafka.tiered.storage.integration;
 
 import kafka.api.TransactionsTest;
-import kafka.server.HostedPartition;
 import kafka.server.KafkaBroker;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.server.HostedPartition;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
 


Reply via email to