This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1cc0b5e  MINOR: Seal the HostedPartition enumeration (#6917)
1cc0b5e is described below

commit 1cc0b5eb81b838c4ffb6ff2c2d3f16b7b3754a01
Author: José Armando García Sancio <[email protected]>
AuthorDate: Wed Jun 12 09:38:21 2019 -0700

    MINOR: Seal the HostedPartition enumeration (#6917)
    
    Makes HostedPartition a sealed trait and make all of the match cases 
explicit.
    
    Reviewers: Vikas Singh <[email protected]>, Jason 
Gustafson <[email protected]>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  2 +-
 .../scala/kafka/server/DelayedDeleteRecords.scala  |  5 +--
 .../main/scala/kafka/server/DelayedProduce.scala   |  5 +--
 .../main/scala/kafka/server/ReplicaManager.scala   | 52 ++++++++++++----------
 .../coordinator/group/GroupCoordinatorTest.scala   |  6 +--
 .../group/GroupMetadataManagerTest.scala           | 22 +++++----
 6 files changed, 46 insertions(+), 46 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 22d9dff..251c570 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -165,7 +165,7 @@ class Partition(val topicPartition: TopicPartition,
                 private val stateStore: PartitionStateStore,
                 private val delayedOperations: DelayedOperations,
                 private val metadataCache: MetadataCache,
-                private val logManager: LogManager) extends HostedPartition 
with Logging with KafkaMetricsGroup {
+                private val logManager: LogManager) extends Logging with 
KafkaMetricsGroup {
 
   def topic: String = topicPartition.topic
   def partitionId: Int = topicPartition.partition
diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala 
b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
index dac9f79..236c8d1 100644
--- a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
+++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
@@ -20,10 +20,9 @@ package kafka.server
 
 import java.util.concurrent.TimeUnit
 
-import kafka.cluster.Partition
 import kafka.metrics.KafkaMetricsGroup
-import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.DeleteRecordsResponse
 
 import scala.collection._
@@ -74,7 +73,7 @@ class DelayedDeleteRecords(delayMs: Long,
       // skip those partitions that have already been satisfied
       if (status.acksPending) {
         val (lowWatermarkReached, error, lw) = 
replicaManager.getPartition(topicPartition) match {
-          case partition: Partition =>
+          case HostedPartition.Online(partition) =>
             partition.leaderReplicaIfLocal match {
               case Some(_) =>
                 val leaderLW = partition.lowWatermarkIfLeader
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala 
b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 1570d4b..f1d1407 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -22,11 +22,10 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.Lock
 
 import com.yammer.metrics.core.Meter
-import kafka.cluster.Partition
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.Pool
-import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 
 import scala.collection._
@@ -88,7 +87,7 @@ class DelayedProduce(delayMs: Long,
       // skip those partitions that have already been satisfied
       if (status.acksPending) {
         val (hasEnough, error) = replicaManager.getPartition(topicPartition) 
match {
-          case partition: Partition  =>
+          case HostedPartition.Online(partition) =>
             partition.checkEnoughReplicasReachOffset(status.requiredOffset)
 
           case HostedPartition.Offline =>
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 64b6bee..7ded985 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -114,21 +114,24 @@ case class FetchPartitionData(error: Errors = Errors.NONE,
  * instance when the broker receives a LeaderAndIsr request from the 
controller indicating
  * that it should be either a leader or follower of a partition.
  */
-trait HostedPartition
-
+sealed trait HostedPartition
 object HostedPartition {
   /**
    * This broker does not have any state for this partition locally.
    */
-  object None extends HostedPartition
+  final object None extends HostedPartition
+
+  /**
+   * This broker hosts the partition and it is online.
+   */
+  final case class Online(partition: Partition) extends HostedPartition
 
   /**
    * This broker hosts the partition, but it is in an offline log directory.
    */
-  object Offline extends HostedPartition
+  final object Offline extends HostedPartition
 }
 
-
 object ReplicaManager {
   val HighWatermarkFilename = "replication-offset-checkpoint"
   val IsrChangePropagationBlackOut = 5000L
@@ -183,8 +186,9 @@ class ReplicaManager(val config: KafkaConfig,
   /* epoch of the controller that last changed the leader */
   @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch
   private val localBrokerId = config.brokerId
-  private val allPartitions = new Pool[TopicPartition, 
HostedPartition](valueFactory = Some(tp =>
-    Partition(tp, time, this)))
+  private val allPartitions = new Pool[TopicPartition, HostedPartition](
+    valueFactory = Some(tp => HostedPartition.Online(Partition(tp, time, 
this)))
+  )
   private val replicaStateChangeLock = new Object
   val replicaFetcherManager = createReplicaFetcherManager(metrics, time, 
threadNamePrefix, quotaManagers.follower)
   val replicaAlterLogDirsManager = 
createReplicaAlterLogDirsManager(quotaManagers.alterLogDirs, brokerTopicStats)
@@ -320,8 +324,8 @@ class ReplicaManager(val config: KafkaConfig,
 
   private def maybeRemoveTopicMetrics(topic: String): Unit = {
     val topicHasOnlinePartition = allPartitions.values.exists {
-      case partition: Partition => topic == partition.topic
-      case _ => false
+      case HostedPartition.Online(partition) => topic == partition.topic
+      case HostedPartition.None | HostedPartition.Offline => false
     }
     if (!topicHasOnlinePartition)
       brokerTopicStats.removeMetrics(topic)
@@ -335,8 +339,8 @@ class ReplicaManager(val config: KafkaConfig,
         case HostedPartition.Offline =>
           throw new KafkaStorageException(s"Partition $topicPartition is on an 
offline disk")
 
-        case removedPartition: Partition =>
-          if (allPartitions.remove(topicPartition, removedPartition)) {
+        case hostedPartition @ HostedPartition.Online(removedPartition) =>
+          if (allPartitions.remove(topicPartition, hostedPartition)) {
             maybeRemoveTopicMetrics(topicPartition.topic)
             // this will delete the local log. This call may throw exception 
if the log is on offline directory
             removedPartition.delete()
@@ -393,14 +397,14 @@ class ReplicaManager(val config: KafkaConfig,
   // Visible for testing
   def createPartition(topicPartition: TopicPartition): Partition = {
     val partition = Partition(topicPartition, time, this)
-    allPartitions.put(topicPartition, partition)
+    allPartitions.put(topicPartition, HostedPartition.Online(partition))
     partition
   }
 
   def nonOfflinePartition(topicPartition: TopicPartition): Option[Partition] = 
{
     getPartition(topicPartition) match {
-      case partition: Partition => Some(partition)
-      case _ => None
+      case HostedPartition.Online(partition) => Some(partition)
+      case HostedPartition.None | HostedPartition.Offline => None
     }
   }
 
@@ -408,8 +412,8 @@ class ReplicaManager(val config: KafkaConfig,
   // the iterator has been constructed could still be returned by this 
iterator.
   private def nonOfflinePartitionsIterator: Iterator[Partition] = {
     allPartitions.values.iterator.flatMap {
-      case p: Partition => Some(p)
-      case _ => None
+      case HostedPartition.Online(partition) => Some(partition)
+      case HostedPartition.None | HostedPartition.Offline => None
     }
   }
 
@@ -419,7 +423,7 @@ class ReplicaManager(val config: KafkaConfig,
 
   def getPartitionOrException(topicPartition: TopicPartition, expectLeader: 
Boolean): Partition = {
     getPartition(topicPartition) match {
-      case partition: Partition =>
+      case HostedPartition.Online(partition) =>
         partition
 
       case HostedPartition.Offline =>
@@ -577,7 +581,7 @@ class ReplicaManager(val config: KafkaConfig,
             throw new KafkaStorageException(s"Log directory $destinationDir is 
offline")
 
           getPartition(topicPartition) match {
-            case partition: Partition =>
+            case HostedPartition.Online(partition) =>
               // Stop current replica movement if the destinationDir is 
different from the existing destination log directory
               if (partition.futureReplicaDirChanged(destinationDir)) {
                 
replicaAlterLogDirsManager.removeFetcherForPartitions(Set(topicPartition))
@@ -586,7 +590,7 @@ class ReplicaManager(val config: KafkaConfig,
             case HostedPartition.Offline =>
               throw new KafkaStorageException(s"Partition $topicPartition is 
offline")
 
-            case _ => // Do nothing
+            case HostedPartition.None => // Do nothing
           }
 
           // If the log for this partition has not been created yet:
@@ -776,8 +780,8 @@ class ReplicaManager(val config: KafkaConfig,
             (topicPartition, 
LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
           case t: Throwable =>
             val logStartOffset = getPartition(topicPartition) match {
-              case partition: Partition => partition.logStartOffset
-              case _ => -1L
+              case HostedPartition.Online(partition) => 
partition.logStartOffset
+              case HostedPartition.None | HostedPartition.Offline => -1L
             }
             
brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark()
             brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark()
@@ -1064,11 +1068,11 @@ class ReplicaManager(val config: KafkaConfig,
               responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
               None
 
-            case partition: Partition => Some(partition)
+            case HostedPartition.Online(partition) => Some(partition)
 
             case HostedPartition.None =>
               val partition = Partition(topicPartition, time, this)
-              allPartitions.putIfNotExists(topicPartition, partition)
+              allPartitions.putIfNotExists(topicPartition, 
HostedPartition.Online(partition))
               newPartitions.add(partition)
               Some(partition)
           }
@@ -1534,7 +1538,7 @@ class ReplicaManager(val config: KafkaConfig,
   def lastOffsetForLeaderEpoch(requestedEpochInfo: Map[TopicPartition, 
OffsetsForLeaderEpochRequest.PartitionData]): Map[TopicPartition, 
EpochEndOffset] = {
     requestedEpochInfo.map { case (tp, partitionData) =>
       val epochEndOffset = getPartition(tp) match {
-        case partition: Partition =>
+        case HostedPartition.Online(partition) =>
           partition.lastOffsetForLeaderEpoch(partitionData.currentLeaderEpoch, 
partitionData.leaderEpoch,
             fetchOnlyFromLeader = true)
 
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index c12e19d..d5d33fb 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -2017,7 +2017,7 @@ class GroupCoordinatorTest {
 
     EasyMock.reset(replicaManager)
     
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
-    
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition)
+    
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(HostedPartition.Online(partition))
     
EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
     EasyMock.replay(replicaManager, partition)
 
@@ -2558,7 +2558,7 @@ class GroupCoordinatorTest {
 
     EasyMock.reset(replicaManager)
     
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
-    
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition)
+    
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(HostedPartition.Online(partition))
     
EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
     EasyMock.replay(replicaManager, partition)
 
@@ -2599,7 +2599,7 @@ class GroupCoordinatorTest {
 
     EasyMock.reset(replicaManager)
     
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
-    
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition)
+    
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(HostedPartition.Online(partition))
     
EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
     EasyMock.replay(replicaManager, partition)
 
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 0487178..f9b571e 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -17,15 +17,24 @@
 
 package kafka.coordinator.group
 
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.Gauge
+import java.nio.ByteBuffer
+import java.util.Collections
+import java.util.Optional
+import java.util.concurrent.locks.ReentrantLock
 import kafka.api._
 import kafka.cluster.Partition
 import kafka.common.OffsetAndMetadata
 import kafka.log.{Log, LogAppendInfo}
 import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, 
ReplicaManager}
+import kafka.server.HostedPartition
 import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
+import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
 import 
org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.OffsetFetchResponse
@@ -34,19 +43,8 @@ import org.easymock.{Capture, EasyMock, IAnswer}
 import org.junit.Assert.{assertEquals, assertFalse, assertNull, assertTrue}
 import org.junit.{Before, Test}
 import org.scalatest.Assertions.fail
-import java.nio.ByteBuffer
-import java.util.Collections
-import java.util.Optional
-
-import com.yammer.metrics.Metrics
-import com.yammer.metrics.core.Gauge
-import org.apache.kafka.common.internals.Topic
-
 import scala.collection.JavaConverters._
 import scala.collection._
-import java.util.concurrent.locks.ReentrantLock
-
-import kafka.zk.KafkaZkClient
 
 class GroupMetadataManagerTest {
 
@@ -2025,7 +2023,7 @@ class GroupMetadataManagerTest {
   }
 
   private def mockGetPartition(): Unit = {
-    
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition)
+    
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(HostedPartition.Online(partition))
     
EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
   }
 

Reply via email to