This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 783ab74  KAFKA-8333; Load high watermark checkpoint lazily when 
initializing replicas (#6800)
783ab74 is described below

commit 783ab74793cc7e541e6b8ed4e1c545bf5dcff959
Author: Jason Gustafson <[email protected]>
AuthorDate: Mon Jun 10 18:59:59 2019 -0700

    KAFKA-8333; Load high watermark checkpoint lazily when initializing 
replicas (#6800)
    
    Currently we load the high watermark checkpoint separately for every 
replica that we load. This patch makes this loading logic lazy and caches the 
loaded map while a LeaderAndIsr request is being handled.
    
    Reviewers: Jun Rao <[email protected]>
---
 .../main/scala/kafka/server/ReplicaManager.scala   |  6 ++--
 .../server/checkpoints/OffsetCheckpointFile.scala  | 23 +++++++++++---
 .../unit/kafka/server/ReplicaManagerTest.scala     | 20 ++++++------
 .../checkpoints/OffsetCheckpointFileTest.scala     | 36 ++++++++++++++++++++++
 4 files changed, 67 insertions(+), 18 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 8b383c2..64b6bee 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -29,7 +29,7 @@ import kafka.controller.{KafkaController, StateChangeLogger}
 import kafka.log._
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
-import kafka.server.checkpoints.{OffsetCheckpointFile, OffsetCheckpoints, 
SimpleOffsetCheckpoints}
+import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, 
OffsetCheckpoints}
 import kafka.utils._
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.ElectionType
@@ -604,7 +604,7 @@ class ReplicaManager(val config: KafkaConfig,
           //   start ReplicaAlterDirThread to move data of this partition from 
the current log to the future log
           // - Otherwise, return KafkaStorageException. We do not create the 
future log while there is offline log directory
           //   so that we can avoid creating future log for the same partition 
in multiple log directories.
-          val highWatermarkCheckpoints = new 
SimpleOffsetCheckpoints(this.highWatermarkCheckpoints)
+          val highWatermarkCheckpoints = new 
LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
           if (partition.maybeCreateFutureReplica(destinationDir, 
highWatermarkCheckpoints)) {
             val futureReplica = futureLocalReplicaOrException(topicPartition)
             logManager.abortAndPauseCleaning(topicPartition)
@@ -1109,7 +1109,7 @@ class ReplicaManager(val config: KafkaConfig,
         }
         val partitionsToBeFollower = partitionState -- 
partitionsTobeLeader.keys
 
-        val highWatermarkCheckpoints = new 
SimpleOffsetCheckpoints(this.highWatermarkCheckpoints)
+        val highWatermarkCheckpoints = new 
LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
         val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
           makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, 
correlationId, responseMap,
             highWatermarkCheckpoints)
diff --git 
a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala 
b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
index 715f42f..69e62d2 100644
--- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
@@ -66,13 +66,26 @@ trait OffsetCheckpoints {
   def fetch(logDir: String, topicPartition: TopicPartition): Option[Long]
 }
 
-class SimpleOffsetCheckpoints(checkpointFilesByLogDir: Map[String, 
OffsetCheckpointFile])
-  extends OffsetCheckpoints {
+/**
+ * Loads checkpoint files on demand and caches the offsets for reuse.
+ */
+class LazyOffsetCheckpoints(checkpointsByLogDir: Map[String, 
OffsetCheckpointFile]) extends OffsetCheckpoints {
+  private val lazyCheckpointsByLogDir = checkpointsByLogDir.map { case 
(logDir, checkpointFile) =>
+    logDir -> new LazyOffsetCheckpointMap(checkpointFile)
+  }.toMap
 
   override def fetch(logDir: String, topicPartition: TopicPartition): 
Option[Long] = {
-    val checkpoint = checkpointFilesByLogDir(logDir)
-    val offsetMap = checkpoint.read()
-    offsetMap.get(topicPartition)
+    val offsetCheckpointFile = lazyCheckpointsByLogDir.getOrElse(logDir,
+      throw new IllegalArgumentException(s"No checkpoint file for log dir 
$logDir"))
+    offsetCheckpointFile.fetch(topicPartition)
+  }
+}
+
+class LazyOffsetCheckpointMap(checkpoint: OffsetCheckpointFile) {
+  private lazy val offsets: Map[TopicPartition, Long] = checkpoint.read()
+
+  def fetch(topicPartition: TopicPartition): Option[Long] = {
+    offsets.get(topicPartition)
   }
 
 }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 59248f0..64a29ed 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -26,7 +26,7 @@ import kafka.log.{Log, LogConfig, LogManager, 
ProducerStateManager}
 import kafka.utils.{MockScheduler, MockTime, TestUtils}
 import TestUtils.createBroker
 import kafka.cluster.BrokerEndPoint
-import kafka.server.checkpoints.SimpleOffsetCheckpoints
+import kafka.server.checkpoints.LazyOffsetCheckpoints
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.timer.MockTimer
 import kafka.zk.KafkaZkClient
@@ -87,7 +87,7 @@ class ReplicaManagerTest {
       new MetadataCache(config.brokerId), new 
LogDirFailureChannel(config.logDirs.size))
     try {
       val partition = rm.createPartition(new TopicPartition(topic, 1))
-      partition.getOrCreateReplica(1, isNew = false, new 
SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
+      partition.getOrCreateReplica(1, isNew = false, new 
LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
       rm.checkpointHighWatermarks()
     } finally {
       // shutdown the replica manager upon test completion
@@ -106,7 +106,7 @@ class ReplicaManagerTest {
       new MetadataCache(config.brokerId), new 
LogDirFailureChannel(config.logDirs.size))
     try {
       val partition = rm.createPartition(new TopicPartition(topic, 1))
-      partition.getOrCreateReplica(1, isNew = false, new 
SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
+      partition.getOrCreateReplica(1, isNew = false, new 
LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
       rm.checkpointHighWatermarks()
     } finally {
       // shutdown the replica manager upon test completion
@@ -160,7 +160,7 @@ class ReplicaManagerTest {
       val brokerList = Seq[Integer](0, 1).asJava
 
       val partition = rm.createPartition(new TopicPartition(topic, 0))
-      partition.getOrCreateReplica(0, isNew = false, new 
SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
+      partition.getOrCreateReplica(0, isNew = false, new 
LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
         collection.immutable.Map(new TopicPartition(topic, 0) ->
@@ -204,7 +204,7 @@ class ReplicaManagerTest {
       val brokerList = Seq[Integer](0, 1).asJava
 
       val partition = replicaManager.createPartition(new TopicPartition(topic, 
0))
-      partition.getOrCreateReplica(0, isNew = false, new 
SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+      partition.getOrCreateReplica(0, isNew = false, new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
 
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
@@ -255,7 +255,7 @@ class ReplicaManagerTest {
       val brokerList = Seq[Integer](0, 1).asJava
 
       val partition = replicaManager.createPartition(new TopicPartition(topic, 
0))
-      partition.getOrCreateReplica(0, isNew = false, new 
SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+      partition.getOrCreateReplica(0, isNew = false, new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
 
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
@@ -351,7 +351,7 @@ class ReplicaManagerTest {
     try {
       val brokerList = Seq[Integer](0, 1).asJava
       val partition = replicaManager.createPartition(new TopicPartition(topic, 
0))
-      partition.getOrCreateReplica(0, isNew = false, new 
SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+      partition.getOrCreateReplica(0, isNew = false, new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
 
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
@@ -417,7 +417,7 @@ class ReplicaManagerTest {
       val brokerList = Seq[Integer](0, 1, 2).asJava
 
       val partition = rm.createPartition(new TopicPartition(topic, 0))
-      partition.getOrCreateReplica(0, isNew = false, new 
SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
+      partition.getOrCreateReplica(0, isNew = false, new 
LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
 
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
@@ -552,7 +552,7 @@ class ReplicaManagerTest {
       // Create 2 partitions, assign replica 0 as the leader for both a 
different follower (1 and 2) for each
       val tp0 = new TopicPartition(topic, 0)
       val tp1 = new TopicPartition(topic, 1)
-      val offsetCheckpoints = new 
SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+      val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
       replicaManager.createPartition(tp0).getOrCreateReplica(0, isNew = false, 
offsetCheckpoints)
       replicaManager.createPartition(tp1).getOrCreateReplica(0, isNew = false, 
offsetCheckpoints)
       val partition0Replicas = Seq[Integer](0, 1).asJava
@@ -645,7 +645,7 @@ class ReplicaManagerTest {
 
     // Initialize partition state to follower, with leader = 1, leaderEpoch = 1
     val partition = replicaManager.createPartition(new TopicPartition(topic, 
topicPartition))
-    val offsetCheckpoints = new 
SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+    val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
     partition.getOrCreateReplica(followerBrokerId, isNew = false, 
offsetCheckpoints)
     partition.makeFollower(controllerId,
       leaderAndIsrPartitionState(leaderEpoch, leaderBrokerId, aliveBrokerIds),
diff --git 
a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
 
b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
index 2d20674..99a40b6 100644
--- 
a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
@@ -22,6 +22,8 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.KafkaStorageException
 import org.junit.Assert._
 import org.junit.Test
+import org.mockito.Mockito
+import org.scalatest.Assertions.assertThrows
 
 import scala.collection.Map
 
@@ -98,4 +100,38 @@ class OffsetCheckpointFileTest extends Logging {
     new OffsetCheckpointFile(checkpointFile.file, logDirFailureChannel).read()
   }
 
+  @Test
+  def testLazyOffsetCheckpoint(): Unit = {
+    val logDir = "/tmp/kafka-logs"
+    val mockCheckpointFile = Mockito.mock(classOf[OffsetCheckpointFile])
+
+    val lazyCheckpoints = new LazyOffsetCheckpoints(Map(logDir -> 
mockCheckpointFile))
+    Mockito.verify(mockCheckpointFile, Mockito.never()).read()
+
+    val partition0 = new TopicPartition("foo", 0)
+    val partition1 = new TopicPartition("foo", 1)
+    val partition2 = new TopicPartition("foo", 2)
+
+    Mockito.when(mockCheckpointFile.read()).thenReturn(Map(
+      partition0 -> 1000L,
+      partition1 -> 2000L
+    ))
+
+    assertEquals(Some(1000L), lazyCheckpoints.fetch(logDir, partition0))
+    assertEquals(Some(2000L), lazyCheckpoints.fetch(logDir, partition1))
+    assertEquals(None, lazyCheckpoints.fetch(logDir, partition2))
+
+    Mockito.verify(mockCheckpointFile, Mockito.times(1)).read()
+  }
+
+  @Test
+  def testLazyOffsetCheckpointFileInvalidLogDir(): Unit = {
+    val logDir = "/tmp/kafka-logs"
+    val mockCheckpointFile = Mockito.mock(classOf[OffsetCheckpointFile])
+    val lazyCheckpoints = new LazyOffsetCheckpoints(Map(logDir -> 
mockCheckpointFile))
+    assertThrows[IllegalArgumentException] {
+      lazyCheckpoints.fetch("/invalid/kafka-logs", new TopicPartition("foo", 
0))
+    }
+  }
+
 }

Reply via email to