Repository: kafka
Updated Branches:
  refs/heads/trunk c64cfd2e2 -> 249152062


KAFKA-5036; hold onto the leader lock in Partition while serving an O…

…ffsetForLeaderEpoch request

Author: Jun Rao <jun...@gmail.com>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Ben Stopford <benstopf...@gmail.com>

Closes #3074 from junrao/kafka-5036


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/24915206
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/24915206
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/24915206

Branch: refs/heads/trunk
Commit: 24915206260c33cd7118db5359f3927d3be1ff60
Parents: c64cfd2
Author: Jun Rao <jun...@gmail.com>
Authored: Wed May 17 18:54:46 2017 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Wed May 17 18:54:46 2017 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    | 45 +++++++----
 core/src/main/scala/kafka/log/Log.scala         |  2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  6 +-
 .../scala/kafka/server/ReplicaManager.scala     | 48 +++++------
 .../server/epoch/LeaderEpochFileCache.scala     |  2 +-
 .../epoch/OffsetsForLeaderEpochTest.scala       | 83 +++++++++++---------
 6 files changed, 100 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/24915206/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 1d13689..f123a16 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -16,28 +16,30 @@
  */
 package kafka.cluster
 
-import kafka.common._
-import kafka.utils._
-import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
-import kafka.admin.AdminUtils
-import kafka.api.LeaderAndIsr
-import kafka.log.LogConfig
-import kafka.server._
-import kafka.metrics.KafkaMetricsGroup
-import kafka.controller.KafkaController
 import java.io.IOException
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
-import org.apache.kafka.common.errors.{PolicyViolationException, 
NotEnoughReplicasException, NotLeaderForPartitionException}
-import org.apache.kafka.common.protocol.Errors
-
-import scala.collection.JavaConverters._
 import com.yammer.metrics.core.Gauge
+import kafka.admin.AdminUtils
+import kafka.api.LeaderAndIsr
+import kafka.common.NotAssignedReplicaException
+import kafka.controller.KafkaController
+import kafka.log.LogConfig
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server._
+import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
+import kafka.utils._
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.{NotEnoughReplicasException, 
NotLeaderForPartitionException, PolicyViolationException}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.Errors._
 import org.apache.kafka.common.record.MemoryRecords
-import org.apache.kafka.common.requests.PartitionState
+import org.apache.kafka.common.requests.EpochEndOffset._
+import org.apache.kafka.common.requests.{EpochEndOffset, PartitionState}
 import org.apache.kafka.common.utils.Time
 
+import scala.collection.JavaConverters._
+
 /**
  * Data structure that represents a topic partition. The leader maintains the 
AR, ISR, CUR, RAR
  */
@@ -510,6 +512,21 @@ class Partition(val topic: String,
     }
   }
 
+  /**
+    * @param leaderEpoch Requested leader epoch
+    * @return The last offset of messages published under this leader epoch.
+    */
+  def lastOffsetForLeaderEpoch(leaderEpoch: Int): EpochEndOffset = {
+    inReadLock(leaderIsrUpdateLock) {
+      leaderReplicaIfLocal match {
+        case Some(leaderReplica) =>
+          new EpochEndOffset(NONE, 
leaderReplica.epochs.get.endOffsetFor(leaderEpoch))
+        case None =>
+          new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET)
+      }
+    }
+  }
+
   private def updateIsr(newIsr: Set[Replica]) {
     val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.map(r => r.brokerId).toList, zkVersion)
     val (updateSucceeded,newVersion) = 
ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/24915206/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index dd22a26..7a47657 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -513,7 +513,7 @@ class Log(@volatile var dir: File,
    * @throws KafkaStorageException If the append fails due to an I/O error.
    * @return Information about the appended messages including the first and 
last offset.
    */
-  private def append(records: MemoryRecords, isFromClient: Boolean, 
assignOffsets: Boolean = true, leaderEpoch: Int): LogAppendInfo = {
+  private def append(records: MemoryRecords, isFromClient: Boolean, 
assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
     val appendInfo = analyzeAndValidateRecords(records, isFromClient = 
isFromClient)
 
     // return if we have no valid messages or if this is a duplicate of the 
last appended entry

http://git-wip-us.apache.org/repos/asf/kafka/blob/24915206/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 33b696a..c3c37c1 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1656,12 +1656,12 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit 
= {
-    val offsetForEpoch = request.body[OffsetsForLeaderEpochRequest]
-    val requestInfo = offsetForEpoch.epochsByTopicPartition()
+    val offsetForLeaderEpoch = request.body[OffsetsForLeaderEpochRequest]
+    val requestInfo = offsetForLeaderEpoch.epochsByTopicPartition()
     authorizeClusterAction(request)
 
     val responseBody = new OffsetsForLeaderEpochResponse(
-      replicaManager.getResponseFor(requestInfo)
+      replicaManager.lastOffsetForLeaderEpoch(requestInfo.asScala).asJava
     )
     sendResponseExemptThrottle(request, new RequestChannel.Response(request, 
responseBody))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/24915206/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index eec9ab2..47b6d69 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -20,36 +20,32 @@ import java.io.{File, IOException}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 
-import org.apache.kafka.common.errors._
 import com.yammer.metrics.core.Gauge
 import kafka.api._
 import kafka.cluster.{Partition, Replica}
+import kafka.common.KafkaStorageException
 import kafka.controller.KafkaController
 import kafka.log.{Log, LogAppendInfo, LogManager}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils._
-import org.apache.kafka.common.errors.{ControllerMovedException, 
CorruptRecordException, InvalidTimestampException, InvalidTopicException, 
NotEnoughReplicasException, NotLeaderForPartitionException, 
OffsetOutOfRangeException, PolicyViolationException}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.{ControllerMovedException, 
CorruptRecordException, InvalidTimestampException, InvalidTopicException, 
NotEnoughReplicasException, NotLeaderForPartitionException, 
OffsetOutOfRangeException, PolicyViolationException, _}
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.{DeleteRecordsRequest, 
DeleteRecordsResponse, LeaderAndIsrRequest, PartitionState, StopReplicaRequest, 
UpdateMetadataRequest}
-import org.apache.kafka.common.requests._
-import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.requests.{DeleteRecordsRequest, 
DeleteRecordsResponse, LeaderAndIsrRequest, PartitionState, StopReplicaRequest, 
UpdateMetadataRequest, _}
+import org.apache.kafka.common.utils.Time
 
-import scala.collection._
 import scala.collection.JavaConverters._
-import java.util.{Map => JMap}
-
-import kafka.common.KafkaStorageException
-import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.protocol.Errors._
-import org.apache.kafka.common.requests.EpochEndOffset._
+import scala.collection._
 
 /*
  * Result metadata of a log append operation on the log
@@ -1108,22 +1104,16 @@ class ReplicaManager(val config: KafkaConfig,
     new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, 
quotaManager)
   }
 
-  def getResponseFor(requestedEpochInfo: JMap[TopicPartition, Integer]): 
JMap[TopicPartition, EpochEndOffset] = {
-    OffsetsForLeaderEpoch.getResponseFor(this, requestedEpochInfo)
-  }
-}
-
-object OffsetsForLeaderEpoch extends Logging {
-  def getResponseFor(replicaManager: ReplicaManager, requestedEpochInfo: 
JMap[TopicPartition, Integer]): JMap[TopicPartition, EpochEndOffset] = {
-    debug(s"Processing OffsetForEpochRequest: $requestedEpochInfo")
-    requestedEpochInfo.asScala.map { case (tp, epoch) =>
-      val offset = try {
-        new EpochEndOffset(NONE, 
replicaManager.getLeaderReplicaIfLocal(tp).epochs.get.endOffsetFor(epoch))
-      } catch {
-        case _: NotLeaderForPartitionException => new 
EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET)
-        case _: UnknownTopicOrPartitionException => new 
EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET)
+  def lastOffsetForLeaderEpoch(requestedEpochInfo: Map[TopicPartition, 
Integer]): Map[TopicPartition, EpochEndOffset] = {
+    requestedEpochInfo.map { case (tp, leaderEpoch) =>
+      val epochEndOffset = getPartition(tp) match {
+        case Some(partition) =>
+          partition.lastOffsetForLeaderEpoch(leaderEpoch)
+        case None =>
+          new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, 
UNDEFINED_EPOCH_OFFSET)
       }
-      (tp, offset)
-    }.toMap.asJava
+      tp -> epochEndOffset
+    }
   }
 }
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/24915206/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala 
b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index 2b1ecc7..ffca900 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -180,7 +180,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, 
leo: () => LogOffsetM
     checkpoint.write(epochs)
   }
 
-  def epochChangeMsg(epoch: Int, offset: Long) = s"New: {epoch:$epoch, 
offset:$offset}, Latest: {epoch:$latestEpoch, offset$latestOffset} for 
Partition: $topicPartition"
+  def epochChangeMsg(epoch: Int, offset: Long) = s"New: {epoch:$epoch, 
offset:$offset}, Current: {epoch:$latestEpoch, offset$latestOffset} for 
Partition: $topicPartition"
 
   def validateAndMaybeWarn(epoch: Int, offset: Long) = {
     assert(epoch >= 0, s"Received a PartitionLeaderEpoch assignment for an 
epoch < 0. This should not happen. ${epochChangeMsg(epoch, offset)}")

http://git-wip-us.apache.org/repos/asf/kafka/blob/24915206/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala 
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 77b9068..d004641 100644
--- 
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -16,83 +16,90 @@
   */
 package kafka.server.epoch
 
-import kafka.server.OffsetsForLeaderEpoch
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.cluster.Replica
+import kafka.server._
+import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{NotLeaderForPartitionException, 
UnknownTopicOrPartitionException}
+import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.requests.EpochEndOffset
+import org.apache.kafka.common.requests.EpochEndOffset._
 import org.easymock.EasyMock._
-import org.junit.Test
 import org.junit.Assert._
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import org.junit.Test
 
 class OffsetsForLeaderEpochTest {
+  private val config = TestUtils.createBrokerConfigs(1, 
TestUtils.MockZkConnect).map(KafkaConfig.fromProps).head
+  private val time = new MockTime
+  private val metrics = new Metrics
+  private val tp = new TopicPartition("topic", 1)
 
   @Test
   def shouldGetEpochsFromReplica(): Unit = {
-    val replicaManager = createNiceMock(classOf[kafka.server.ReplicaManager])
-    val replica = createNiceMock(classOf[kafka.cluster.Replica])
-    val cache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochCache])
-
     //Given
-    val tp = new TopicPartition("topic", 1)
     val offset = 42
     val epochRequested: Integer = 5
-    val request = mutable.Map(tp -> epochRequested).asJava
+    val request = Map(tp -> epochRequested)
 
     //Stubs
-    expect(replicaManager.getLeaderReplicaIfLocal(tp)).andReturn(replica)
-    expect(replica.epochs).andReturn(Some(cache))
-    expect(cache.endOffsetFor(epochRequested)).andReturn(offset)
-    replay(replica, replicaManager, cache)
+    val mockLog = createNiceMock(classOf[kafka.log.Log])
+    val mockCache = 
createNiceMock(classOf[kafka.server.epoch.LeaderEpochCache])
+    expect(mockCache.endOffsetFor(epochRequested)).andReturn(offset)
+    expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes()
+    replay(mockCache, mockLog)
+
+    // create a replica manager with 1 partition that has 1 replica
+    val replicaManager = new ReplicaManager(config, metrics, time, null, null, 
null, new AtomicBoolean(false),
+      QuotaFactory.instantiate(config, metrics, time).follower, new 
BrokerTopicStats,
+      new MetadataCache(config.brokerId))
+    val partition = replicaManager.getOrCreatePartition(tp)
+    val leaderReplica = new Replica(config.brokerId, partition, time, 0, 
Some(mockLog))
+    partition.addReplicaIfNotExists(leaderReplica)
+    partition.leaderReplicaIdOpt = Some(config.brokerId)
 
     //When
-    val response = OffsetsForLeaderEpoch.getResponseFor(replicaManager, 
request)
+    val response = replicaManager.lastOffsetForLeaderEpoch(request)
 
     //Then
-    assertEquals(new EpochEndOffset(Errors.NONE, offset), response.get(tp))
+    assertEquals(new EpochEndOffset(Errors.NONE, offset), response(tp))
   }
 
   @Test
-  def shonuldReturnNoLeaderForPartitionIfThrown(): Unit = {
-    val replicaManager = createNiceMock(classOf[kafka.server.ReplicaManager])
+  def shouldReturnNoLeaderForPartitionIfThrown(): Unit = {
+    //create a replica manager with 1 partition that has 0 replica
+    val replicaManager = new ReplicaManager(config, metrics, time, null, null, 
null, new AtomicBoolean(false),
+      QuotaFactory.instantiate(config, metrics, time).follower, new 
BrokerTopicStats,
+      new MetadataCache(config.brokerId))
+    replicaManager.getOrCreatePartition(tp)
 
     //Given
-    val tp = new TopicPartition("topic", 1)
     val epochRequested: Integer = 5
-    val request = mutable.Map(tp -> epochRequested).asJava
-
-    //Stubs
-    expect(replicaManager.getLeaderReplicaIfLocal(tp)).andThrow(new 
NotLeaderForPartitionException())
-    replay(replicaManager)
+    val request = Map(tp -> epochRequested)
 
     //When
-    val response = OffsetsForLeaderEpoch.getResponseFor(replicaManager, 
request)
+    val response = replicaManager.lastOffsetForLeaderEpoch(request)
 
     //Then
-    assertEquals(new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, 
UNDEFINED_EPOCH_OFFSET), response.get(tp))
+    assertEquals(new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, 
UNDEFINED_EPOCH_OFFSET), response(tp))
   }
 
   @Test
   def shouldReturnUnknownTopicOrPartitionIfThrown(): Unit = {
-    val replicaManager = createNiceMock(classOf[kafka.server.ReplicaManager])
+    //create a replica manager with 0 partition
+    val replicaManager = new ReplicaManager(config, metrics, time, null, null, 
null, new AtomicBoolean(false),
+      QuotaFactory.instantiate(config, metrics, time).follower, new 
BrokerTopicStats,
+      new MetadataCache(config.brokerId))
 
     //Given
-    val tp = new TopicPartition("topic", 1)
     val epochRequested: Integer = 5
-    val request = mutable.Map(tp -> epochRequested).asJava
-
-    //Stubs
-    expect(replicaManager.getLeaderReplicaIfLocal(tp)).andThrow(new 
UnknownTopicOrPartitionException())
-    replay(replicaManager)
+    val request = Map(tp -> epochRequested)
 
     //When
-    val response = OffsetsForLeaderEpoch.getResponseFor(replicaManager, 
request)
+    val response = replicaManager.lastOffsetForLeaderEpoch(request)
 
     //Then
-    assertEquals(new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
UNDEFINED_EPOCH_OFFSET), response.get(tp))
+    assertEquals(new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
UNDEFINED_EPOCH_OFFSET), response(tp))
   }
 }
\ No newline at end of file

Reply via email to