This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1cc0b5e MINOR: Seal the HostedPartition enumeration (#6917)
1cc0b5e is described below
commit 1cc0b5eb81b838c4ffb6ff2c2d3f16b7b3754a01
Author: José Armando García Sancio <[email protected]>
AuthorDate: Wed Jun 12 09:38:21 2019 -0700
MINOR: Seal the HostedPartition enumeration (#6917)
Makes HostedPartition a sealed trait and make all of the match cases
explicit.
Reviewers: Vikas Singh <[email protected]>, Jason
Gustafson <[email protected]>
---
core/src/main/scala/kafka/cluster/Partition.scala | 2 +-
.../scala/kafka/server/DelayedDeleteRecords.scala | 5 +--
.../main/scala/kafka/server/DelayedProduce.scala | 5 +--
.../main/scala/kafka/server/ReplicaManager.scala | 52 ++++++++++++----------
.../coordinator/group/GroupCoordinatorTest.scala | 6 +--
.../group/GroupMetadataManagerTest.scala | 22 +++++----
6 files changed, 46 insertions(+), 46 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index 22d9dff..251c570 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -165,7 +165,7 @@ class Partition(val topicPartition: TopicPartition,
private val stateStore: PartitionStateStore,
private val delayedOperations: DelayedOperations,
private val metadataCache: MetadataCache,
- private val logManager: LogManager) extends HostedPartition
with Logging with KafkaMetricsGroup {
+ private val logManager: LogManager) extends Logging with
KafkaMetricsGroup {
def topic: String = topicPartition.topic
def partitionId: Int = topicPartition.partition
diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
index dac9f79..236c8d1 100644
--- a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
+++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
@@ -20,10 +20,9 @@ package kafka.server
import java.util.concurrent.TimeUnit
-import kafka.cluster.Partition
import kafka.metrics.KafkaMetricsGroup
-import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.DeleteRecordsResponse
import scala.collection._
@@ -74,7 +73,7 @@ class DelayedDeleteRecords(delayMs: Long,
// skip those partitions that have already been satisfied
if (status.acksPending) {
val (lowWatermarkReached, error, lw) =
replicaManager.getPartition(topicPartition) match {
- case partition: Partition =>
+ case HostedPartition.Online(partition) =>
partition.leaderReplicaIfLocal match {
case Some(_) =>
val leaderLW = partition.lowWatermarkIfLeader
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala
b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 1570d4b..f1d1407 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -22,11 +22,10 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.Lock
import com.yammer.metrics.core.Meter
-import kafka.cluster.Partition
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.Pool
-import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import scala.collection._
@@ -88,7 +87,7 @@ class DelayedProduce(delayMs: Long,
// skip those partitions that have already been satisfied
if (status.acksPending) {
val (hasEnough, error) = replicaManager.getPartition(topicPartition)
match {
- case partition: Partition =>
+ case HostedPartition.Online(partition) =>
partition.checkEnoughReplicasReachOffset(status.requiredOffset)
case HostedPartition.Offline =>
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 64b6bee..7ded985 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -114,21 +114,24 @@ case class FetchPartitionData(error: Errors = Errors.NONE,
* instance when the broker receives a LeaderAndIsr request from the
controller indicating
* that it should be either a leader or follower of a partition.
*/
-trait HostedPartition
-
+sealed trait HostedPartition
object HostedPartition {
/**
* This broker does not have any state for this partition locally.
*/
- object None extends HostedPartition
+ final object None extends HostedPartition
+
+ /**
+ * This broker hosts the partition and it is online.
+ */
+ final case class Online(partition: Partition) extends HostedPartition
/**
* This broker hosts the partition, but it is in an offline log directory.
*/
- object Offline extends HostedPartition
+ final object Offline extends HostedPartition
}
-
object ReplicaManager {
val HighWatermarkFilename = "replication-offset-checkpoint"
val IsrChangePropagationBlackOut = 5000L
@@ -183,8 +186,9 @@ class ReplicaManager(val config: KafkaConfig,
/* epoch of the controller that last changed the leader */
@volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch
private val localBrokerId = config.brokerId
- private val allPartitions = new Pool[TopicPartition,
HostedPartition](valueFactory = Some(tp =>
- Partition(tp, time, this)))
+ private val allPartitions = new Pool[TopicPartition, HostedPartition](
+ valueFactory = Some(tp => HostedPartition.Online(Partition(tp, time,
this)))
+ )
private val replicaStateChangeLock = new Object
val replicaFetcherManager = createReplicaFetcherManager(metrics, time,
threadNamePrefix, quotaManagers.follower)
val replicaAlterLogDirsManager =
createReplicaAlterLogDirsManager(quotaManagers.alterLogDirs, brokerTopicStats)
@@ -320,8 +324,8 @@ class ReplicaManager(val config: KafkaConfig,
private def maybeRemoveTopicMetrics(topic: String): Unit = {
val topicHasOnlinePartition = allPartitions.values.exists {
- case partition: Partition => topic == partition.topic
- case _ => false
+ case HostedPartition.Online(partition) => topic == partition.topic
+ case HostedPartition.None | HostedPartition.Offline => false
}
if (!topicHasOnlinePartition)
brokerTopicStats.removeMetrics(topic)
@@ -335,8 +339,8 @@ class ReplicaManager(val config: KafkaConfig,
case HostedPartition.Offline =>
throw new KafkaStorageException(s"Partition $topicPartition is on an
offline disk")
- case removedPartition: Partition =>
- if (allPartitions.remove(topicPartition, removedPartition)) {
+ case hostedPartition @ HostedPartition.Online(removedPartition) =>
+ if (allPartitions.remove(topicPartition, hostedPartition)) {
maybeRemoveTopicMetrics(topicPartition.topic)
// this will delete the local log. This call may throw exception
if the log is on offline directory
removedPartition.delete()
@@ -393,14 +397,14 @@ class ReplicaManager(val config: KafkaConfig,
// Visible for testing
def createPartition(topicPartition: TopicPartition): Partition = {
val partition = Partition(topicPartition, time, this)
- allPartitions.put(topicPartition, partition)
+ allPartitions.put(topicPartition, HostedPartition.Online(partition))
partition
}
def nonOfflinePartition(topicPartition: TopicPartition): Option[Partition] =
{
getPartition(topicPartition) match {
- case partition: Partition => Some(partition)
- case _ => None
+ case HostedPartition.Online(partition) => Some(partition)
+ case HostedPartition.None | HostedPartition.Offline => None
}
}
@@ -408,8 +412,8 @@ class ReplicaManager(val config: KafkaConfig,
// the iterator has been constructed could still be returned by this
iterator.
private def nonOfflinePartitionsIterator: Iterator[Partition] = {
allPartitions.values.iterator.flatMap {
- case p: Partition => Some(p)
- case _ => None
+ case HostedPartition.Online(partition) => Some(partition)
+ case HostedPartition.None | HostedPartition.Offline => None
}
}
@@ -419,7 +423,7 @@ class ReplicaManager(val config: KafkaConfig,
def getPartitionOrException(topicPartition: TopicPartition, expectLeader:
Boolean): Partition = {
getPartition(topicPartition) match {
- case partition: Partition =>
+ case HostedPartition.Online(partition) =>
partition
case HostedPartition.Offline =>
@@ -577,7 +581,7 @@ class ReplicaManager(val config: KafkaConfig,
throw new KafkaStorageException(s"Log directory $destinationDir is
offline")
getPartition(topicPartition) match {
- case partition: Partition =>
+ case HostedPartition.Online(partition) =>
// Stop current replica movement if the destinationDir is
different from the existing destination log directory
if (partition.futureReplicaDirChanged(destinationDir)) {
replicaAlterLogDirsManager.removeFetcherForPartitions(Set(topicPartition))
@@ -586,7 +590,7 @@ class ReplicaManager(val config: KafkaConfig,
case HostedPartition.Offline =>
throw new KafkaStorageException(s"Partition $topicPartition is
offline")
- case _ => // Do nothing
+ case HostedPartition.None => // Do nothing
}
// If the log for this partition has not been created yet:
@@ -776,8 +780,8 @@ class ReplicaManager(val config: KafkaConfig,
(topicPartition,
LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
case t: Throwable =>
val logStartOffset = getPartition(topicPartition) match {
- case partition: Partition => partition.logStartOffset
- case _ => -1L
+ case HostedPartition.Online(partition) =>
partition.logStartOffset
+ case HostedPartition.None | HostedPartition.Offline => -1L
}
brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark()
brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark()
@@ -1064,11 +1068,11 @@ class ReplicaManager(val config: KafkaConfig,
responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
None
- case partition: Partition => Some(partition)
+ case HostedPartition.Online(partition) => Some(partition)
case HostedPartition.None =>
val partition = Partition(topicPartition, time, this)
- allPartitions.putIfNotExists(topicPartition, partition)
+ allPartitions.putIfNotExists(topicPartition,
HostedPartition.Online(partition))
newPartitions.add(partition)
Some(partition)
}
@@ -1534,7 +1538,7 @@ class ReplicaManager(val config: KafkaConfig,
def lastOffsetForLeaderEpoch(requestedEpochInfo: Map[TopicPartition,
OffsetsForLeaderEpochRequest.PartitionData]): Map[TopicPartition,
EpochEndOffset] = {
requestedEpochInfo.map { case (tp, partitionData) =>
val epochEndOffset = getPartition(tp) match {
- case partition: Partition =>
+ case HostedPartition.Online(partition) =>
partition.lastOffsetForLeaderEpoch(partitionData.currentLeaderEpoch,
partitionData.leaderEpoch,
fetchOnlyFromLeader = true)
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index c12e19d..d5d33fb 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -2017,7 +2017,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
-
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition)
+
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(HostedPartition.Online(partition))
EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
EasyMock.replay(replicaManager, partition)
@@ -2558,7 +2558,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
-
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition)
+
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(HostedPartition.Online(partition))
EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
EasyMock.replay(replicaManager, partition)
@@ -2599,7 +2599,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
-
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition)
+
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(HostedPartition.Online(partition))
EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
EasyMock.replay(replicaManager, partition)
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 0487178..f9b571e 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -17,15 +17,24 @@
package kafka.coordinator.group
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.Gauge
+import java.nio.ByteBuffer
+import java.util.Collections
+import java.util.Optional
+import java.util.concurrent.locks.ReentrantLock
import kafka.api._
import kafka.cluster.Partition
import kafka.common.OffsetAndMetadata
import kafka.log.{Log, LogAppendInfo}
import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata,
ReplicaManager}
+import kafka.server.HostedPartition
import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
+import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import
org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.OffsetFetchResponse
@@ -34,19 +43,8 @@ import org.easymock.{Capture, EasyMock, IAnswer}
import org.junit.Assert.{assertEquals, assertFalse, assertNull, assertTrue}
import org.junit.{Before, Test}
import org.scalatest.Assertions.fail
-import java.nio.ByteBuffer
-import java.util.Collections
-import java.util.Optional
-
-import com.yammer.metrics.Metrics
-import com.yammer.metrics.core.Gauge
-import org.apache.kafka.common.internals.Topic
-
import scala.collection.JavaConverters._
import scala.collection._
-import java.util.concurrent.locks.ReentrantLock
-
-import kafka.zk.KafkaZkClient
class GroupMetadataManagerTest {
@@ -2025,7 +2023,7 @@ class GroupMetadataManagerTest {
}
private def mockGetPartition(): Unit = {
-
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition)
+
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(HostedPartition.Online(partition))
EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
}