This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 783ab74 KAFKA-8333; Load high watermark checkpoint lazily when
initializing replicas (#6800)
783ab74 is described below
commit 783ab74793cc7e541e6b8ed4e1c545bf5dcff959
Author: Jason Gustafson <[email protected]>
AuthorDate: Mon Jun 10 18:59:59 2019 -0700
KAFKA-8333; Load high watermark checkpoint lazily when initializing
replicas (#6800)
Currently we load the high watermark checkpoint separately for every
replica that we load. This patch makes this loading logic lazy and caches the
loaded map while a LeaderAndIsr request is being handled.
Reviewers: Jun Rao <[email protected]>
---
.../main/scala/kafka/server/ReplicaManager.scala | 6 ++--
.../server/checkpoints/OffsetCheckpointFile.scala | 23 +++++++++++---
.../unit/kafka/server/ReplicaManagerTest.scala | 20 ++++++------
.../checkpoints/OffsetCheckpointFileTest.scala | 36 ++++++++++++++++++++++
4 files changed, 67 insertions(+), 18 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 8b383c2..64b6bee 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -29,7 +29,7 @@ import kafka.controller.{KafkaController, StateChangeLogger}
import kafka.log._
import kafka.metrics.KafkaMetricsGroup
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
-import kafka.server.checkpoints.{OffsetCheckpointFile, OffsetCheckpoints,
SimpleOffsetCheckpoints}
+import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile,
OffsetCheckpoints}
import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.ElectionType
@@ -604,7 +604,7 @@ class ReplicaManager(val config: KafkaConfig,
// start ReplicaAlterDirThread to move data of this partition from
the current log to the future log
// - Otherwise, return KafkaStorageException. We do not create the
future log while there is offline log directory
// so that we can avoid creating future log for the same partition
in multiple log directories.
- val highWatermarkCheckpoints = new
SimpleOffsetCheckpoints(this.highWatermarkCheckpoints)
+ val highWatermarkCheckpoints = new
LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
if (partition.maybeCreateFutureReplica(destinationDir,
highWatermarkCheckpoints)) {
val futureReplica = futureLocalReplicaOrException(topicPartition)
logManager.abortAndPauseCleaning(topicPartition)
@@ -1109,7 +1109,7 @@ class ReplicaManager(val config: KafkaConfig,
}
val partitionsToBeFollower = partitionState --
partitionsTobeLeader.keys
- val highWatermarkCheckpoints = new
SimpleOffsetCheckpoints(this.highWatermarkCheckpoints)
+ val highWatermarkCheckpoints = new
LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader,
correlationId, responseMap,
highWatermarkCheckpoints)
diff --git
a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
index 715f42f..69e62d2 100644
--- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
@@ -66,13 +66,26 @@ trait OffsetCheckpoints {
def fetch(logDir: String, topicPartition: TopicPartition): Option[Long]
}
-class SimpleOffsetCheckpoints(checkpointFilesByLogDir: Map[String,
OffsetCheckpointFile])
- extends OffsetCheckpoints {
+/**
+ * Loads checkpoint files on demand and caches the offsets for reuse.
+ */
+class LazyOffsetCheckpoints(checkpointsByLogDir: Map[String,
OffsetCheckpointFile]) extends OffsetCheckpoints {
+ private val lazyCheckpointsByLogDir = checkpointsByLogDir.map { case
(logDir, checkpointFile) =>
+ logDir -> new LazyOffsetCheckpointMap(checkpointFile)
+ }.toMap
override def fetch(logDir: String, topicPartition: TopicPartition):
Option[Long] = {
- val checkpoint = checkpointFilesByLogDir(logDir)
- val offsetMap = checkpoint.read()
- offsetMap.get(topicPartition)
+ val offsetCheckpointFile = lazyCheckpointsByLogDir.getOrElse(logDir,
+ throw new IllegalArgumentException(s"No checkpoint file for log dir
$logDir"))
+ offsetCheckpointFile.fetch(topicPartition)
+ }
+}
+
+class LazyOffsetCheckpointMap(checkpoint: OffsetCheckpointFile) {
+ private lazy val offsets: Map[TopicPartition, Long] = checkpoint.read()
+
+ def fetch(topicPartition: TopicPartition): Option[Long] = {
+ offsets.get(topicPartition)
}
}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 59248f0..64a29ed 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -26,7 +26,7 @@ import kafka.log.{Log, LogConfig, LogManager,
ProducerStateManager}
import kafka.utils.{MockScheduler, MockTime, TestUtils}
import TestUtils.createBroker
import kafka.cluster.BrokerEndPoint
-import kafka.server.checkpoints.SimpleOffsetCheckpoints
+import kafka.server.checkpoints.LazyOffsetCheckpoints
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
import kafka.utils.timer.MockTimer
import kafka.zk.KafkaZkClient
@@ -87,7 +87,7 @@ class ReplicaManagerTest {
new MetadataCache(config.brokerId), new
LogDirFailureChannel(config.logDirs.size))
try {
val partition = rm.createPartition(new TopicPartition(topic, 1))
- partition.getOrCreateReplica(1, isNew = false, new
SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
+ partition.getOrCreateReplica(1, isNew = false, new
LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
rm.checkpointHighWatermarks()
} finally {
// shutdown the replica manager upon test completion
@@ -106,7 +106,7 @@ class ReplicaManagerTest {
new MetadataCache(config.brokerId), new
LogDirFailureChannel(config.logDirs.size))
try {
val partition = rm.createPartition(new TopicPartition(topic, 1))
- partition.getOrCreateReplica(1, isNew = false, new
SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
+ partition.getOrCreateReplica(1, isNew = false, new
LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
rm.checkpointHighWatermarks()
} finally {
// shutdown the replica manager upon test completion
@@ -160,7 +160,7 @@ class ReplicaManagerTest {
val brokerList = Seq[Integer](0, 1).asJava
val partition = rm.createPartition(new TopicPartition(topic, 0))
- partition.getOrCreateReplica(0, isNew = false, new
SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
+ partition.getOrCreateReplica(0, isNew = false, new
LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
// Make this replica the leader.
val leaderAndIsrRequest1 = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
brokerEpoch,
collection.immutable.Map(new TopicPartition(topic, 0) ->
@@ -204,7 +204,7 @@ class ReplicaManagerTest {
val brokerList = Seq[Integer](0, 1).asJava
val partition = replicaManager.createPartition(new TopicPartition(topic,
0))
- partition.getOrCreateReplica(0, isNew = false, new
SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+ partition.getOrCreateReplica(0, isNew = false, new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
// Make this replica the leader.
val leaderAndIsrRequest1 = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
brokerEpoch,
@@ -255,7 +255,7 @@ class ReplicaManagerTest {
val brokerList = Seq[Integer](0, 1).asJava
val partition = replicaManager.createPartition(new TopicPartition(topic,
0))
- partition.getOrCreateReplica(0, isNew = false, new
SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+ partition.getOrCreateReplica(0, isNew = false, new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
// Make this replica the leader.
val leaderAndIsrRequest1 = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
brokerEpoch,
@@ -351,7 +351,7 @@ class ReplicaManagerTest {
try {
val brokerList = Seq[Integer](0, 1).asJava
val partition = replicaManager.createPartition(new TopicPartition(topic,
0))
- partition.getOrCreateReplica(0, isNew = false, new
SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+ partition.getOrCreateReplica(0, isNew = false, new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
// Make this replica the leader.
val leaderAndIsrRequest1 = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
brokerEpoch,
@@ -417,7 +417,7 @@ class ReplicaManagerTest {
val brokerList = Seq[Integer](0, 1, 2).asJava
val partition = rm.createPartition(new TopicPartition(topic, 0))
- partition.getOrCreateReplica(0, isNew = false, new
SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
+ partition.getOrCreateReplica(0, isNew = false, new
LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
// Make this replica the leader.
val leaderAndIsrRequest1 = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
brokerEpoch,
@@ -552,7 +552,7 @@ class ReplicaManagerTest {
// Create 2 partitions, assign replica 0 as the leader for both a
different follower (1 and 2) for each
val tp0 = new TopicPartition(topic, 0)
val tp1 = new TopicPartition(topic, 1)
- val offsetCheckpoints = new
SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+ val offsetCheckpoints = new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).getOrCreateReplica(0, isNew = false,
offsetCheckpoints)
replicaManager.createPartition(tp1).getOrCreateReplica(0, isNew = false,
offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava
@@ -645,7 +645,7 @@ class ReplicaManagerTest {
// Initialize partition state to follower, with leader = 1, leaderEpoch = 1
val partition = replicaManager.createPartition(new TopicPartition(topic,
topicPartition))
- val offsetCheckpoints = new
SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+ val offsetCheckpoints = new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
partition.getOrCreateReplica(followerBrokerId, isNew = false,
offsetCheckpoints)
partition.makeFollower(controllerId,
leaderAndIsrPartitionState(leaderEpoch, leaderBrokerId, aliveBrokerIds),
diff --git
a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
index 2d20674..99a40b6 100644
---
a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
+++
b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
@@ -22,6 +22,8 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.KafkaStorageException
import org.junit.Assert._
import org.junit.Test
+import org.mockito.Mockito
+import org.scalatest.Assertions.assertThrows
import scala.collection.Map
@@ -98,4 +100,38 @@ class OffsetCheckpointFileTest extends Logging {
new OffsetCheckpointFile(checkpointFile.file, logDirFailureChannel).read()
}
+ @Test
+ def testLazyOffsetCheckpoint(): Unit = {
+ val logDir = "/tmp/kafka-logs"
+ val mockCheckpointFile = Mockito.mock(classOf[OffsetCheckpointFile])
+
+ val lazyCheckpoints = new LazyOffsetCheckpoints(Map(logDir ->
mockCheckpointFile))
+ Mockito.verify(mockCheckpointFile, Mockito.never()).read()
+
+ val partition0 = new TopicPartition("foo", 0)
+ val partition1 = new TopicPartition("foo", 1)
+ val partition2 = new TopicPartition("foo", 2)
+
+ Mockito.when(mockCheckpointFile.read()).thenReturn(Map(
+ partition0 -> 1000L,
+ partition1 -> 2000L
+ ))
+
+ assertEquals(Some(1000L), lazyCheckpoints.fetch(logDir, partition0))
+ assertEquals(Some(2000L), lazyCheckpoints.fetch(logDir, partition1))
+ assertEquals(None, lazyCheckpoints.fetch(logDir, partition2))
+
+ Mockito.verify(mockCheckpointFile, Mockito.times(1)).read()
+ }
+
+ @Test
+ def testLazyOffsetCheckpointFileInvalidLogDir(): Unit = {
+ val logDir = "/tmp/kafka-logs"
+ val mockCheckpointFile = Mockito.mock(classOf[OffsetCheckpointFile])
+ val lazyCheckpoints = new LazyOffsetCheckpoints(Map(logDir ->
mockCheckpointFile))
+ assertThrows[IllegalArgumentException] {
+ lazyCheckpoints.fetch("/invalid/kafka-logs", new TopicPartition("foo",
0))
+ }
+ }
+
}