This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f3a4ebc  KAFKA-6859; Do not send LeaderEpochRequest for undefined 
leader epochs (#5320)
f3a4ebc is described below

commit f3a4ebc8f8be12884ff67a63c7ba5ce5c5f67881
Author: Stanislav Kozlovski <[email protected]>
AuthorDate: Fri Aug 31 03:04:28 2018 +0300

    KAFKA-6859; Do not send LeaderEpochRequest for undefined leader epochs 
(#5320)
    
    If a broker or topic has a message format < 0.11, it does not track leader 
epochs. LeaderEpochRequests for such will always return undefined, making the 
follower truncate to the highe watermark. Since there is no use to use the 
network for such cases, don't send a request.
    
    Reviewers: Anna Povzner <[email protected]>, Jason Gustafson 
<[email protected]>
---
 core/src/main/scala/kafka/log/LogSegment.scala     |   2 +-
 .../kafka/server/ReplicaAlterLogDirsThread.scala   |   2 +-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |  23 +++-
 .../kafka/server/epoch/LeaderEpochFileCache.scala  |   2 +-
 .../kafka/server/ReplicaFetcherThreadTest.scala    | 152 ++++++++++++++++++++-
 .../unit/kafka/server/ReplicaManagerTest.scala     |   2 +-
 6 files changed, 172 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogSegment.scala 
b/core/src/main/scala/kafka/log/LogSegment.scala
index 0b71670..0c00e55 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -358,7 +358,7 @@ class LogSegment private[log] (val log: FileRecords,
 
         if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
           leaderEpochCache.foreach { cache =>
-            if (batch.partitionLeaderEpoch > cache.latestEpoch()) // this is 
to avoid unnecessary warning in cache.assign()
+            if (batch.partitionLeaderEpoch > cache.latestEpoch) // this is to 
avoid unnecessary warning in cache.assign()
               cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
           }
           updateProducerState(producerStateManager, batch)
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala 
b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 08c4a17..05dc356 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -147,7 +147,7 @@ class ReplicaAlterLogDirsThread(name: String,
 
     val (partitionsWithEpoch, partitionsWithoutEpoch) = 
partitionEpochOpts.partition { case (_, epochCacheOpt) => 
epochCacheOpt.nonEmpty }
 
-    val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> 
epochCacheOpt.get.latestEpoch() }
+    val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> 
epochCacheOpt.get.latestEpoch }
     ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet)
   }
 
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 56335a6..3b1a54f 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -95,7 +95,8 @@ class ReplicaFetcherThread(name: String,
   private val minBytes = brokerConfig.replicaFetchMinBytes
   private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
   private val fetchSize = brokerConfig.replicaFetchMaxBytes
-  private val shouldSendLeaderEpochRequest: Boolean = 
brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2
+  private val brokerSupportsLeaderEpochRequest: Boolean = 
brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2
+
   private val fetchSessionHandler = new FetchSessionHandler(logContext, 
sourceBroker.id)
 
   private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochCache] =  
replicaMgr.getReplica(tp).map(_.epochs.get)
@@ -346,19 +347,29 @@ class ReplicaFetcherThread(name: String,
     val (partitionsWithEpoch, partitionsWithoutEpoch) = 
partitionEpochOpts.partition { case (_, epochCacheOpt) => 
epochCacheOpt.nonEmpty }
 
     debug(s"Build leaderEpoch request $partitionsWithEpoch")
-    val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> 
epochCacheOpt.get.latestEpoch() }
+    val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> 
epochCacheOpt.get.latestEpoch }
     ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet)
   }
 
   override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): 
Map[TopicPartition, EpochEndOffset] = {
     var result: Map[TopicPartition, EpochEndOffset] = null
-    if (shouldSendLeaderEpochRequest) {
-      val partitionsAsJava = partitions.map { case (tp, epoch) => tp -> 
epoch.asInstanceOf[Integer] }.toMap.asJava
+    if (brokerSupportsLeaderEpochRequest) {
+      // skip request for partitions without epoch, as their topic log message 
format doesn't support epochs
+      val (partitionsWithEpoch, partitionsWithoutEpoch) = partitions.partition 
{ case (_, epoch) => epoch != UNDEFINED_EPOCH }
+      val resultWithoutEpoch = partitionsWithoutEpoch.map { case (tp, _) => 
(tp, new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)) }
+
+      if (partitionsWithEpoch.isEmpty) {
+        debug("Skipping leaderEpoch request since all partitions do not have 
an epoch")
+        return resultWithoutEpoch
+      }
+
+      val partitionsAsJava = partitionsWithEpoch.map { case (tp, epoch) => tp 
-> epoch.asInstanceOf[Integer] }.toMap.asJava
       val epochRequest = new 
OffsetsForLeaderEpochRequest.Builder(offsetForLeaderEpochRequestVersion, 
partitionsAsJava)
       try {
         val response = leaderEndpoint.sendRequest(epochRequest)
-        result = 
response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala
-        debug(s"Receive leaderEpoch response $result")
+
+        result = 
response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala
 ++ resultWithoutEpoch
+        debug(s"Receive leaderEpoch response $result; Skipped request for 
partitions ${partitionsWithoutEpoch.keys}")
       } catch {
         case t: Throwable =>
           warn(s"Error when sending leader epoch request for $partitions", t)
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala 
b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index 23a5305..88f5d6b 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -28,7 +28,7 @@ import scala.collection.mutable.ListBuffer
 
 trait LeaderEpochCache {
   def assign(leaderEpoch: Int, offset: Long)
-  def latestEpoch(): Int
+  def latestEpoch: Int
   def endOffsetFor(epoch: Int): (Int, Long)
   def clearAndFlushLatest(offset: Long)
   def clearAndFlushEarliest(offset: Long)
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index fbf7740..9c759ce 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -23,7 +23,9 @@ import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.server.epoch.LeaderEpochCache
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.TestUtils
+import org.apache.kafka.clients.ClientResponse
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.internals.PartitionStates
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.protocol.Errors._
@@ -31,7 +33,7 @@ import org.apache.kafka.common.requests.EpochEndOffset
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.utils.SystemTime
 import org.easymock.EasyMock._
-import org.easymock.{Capture, CaptureType}
+import org.easymock.{Capture, CaptureType, IAnswer}
 import org.junit.Assert._
 import org.junit.Test
 
@@ -44,6 +46,7 @@ class ReplicaFetcherThreadTest {
   private val t1p1 = new TopicPartition("topic1", 1)
   private val t2p1 = new TopicPartition("topic2", 1)
 
+  private var toFail = false
   private val brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000)
 
   @Test
@@ -71,6 +74,14 @@ class ReplicaFetcherThreadTest {
     props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.10.2")
     props.put(KafkaConfig.LogMessageFormatVersionProp, "0.10.2")
     val config = KafkaConfig.fromProps(props)
+    val leaderEndpoint = createMock(classOf[BlockingSend])
+    expect(leaderEndpoint.sendRequest(anyObject())).andAnswer(new 
IAnswer[ClientResponse] {
+      override def answer(): ClientResponse = {
+        toFail = true // assert no leader request is sent
+        createMock(classOf[ClientResponse])
+      }
+    })
+    replay(leaderEndpoint)
     val thread = new ReplicaFetcherThread(
       name = "bob",
       fetcherId = 0,
@@ -89,10 +100,149 @@ class ReplicaFetcherThreadTest {
       t1p1 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
     )
 
+    assertFalse("Leader request should not have been sent", toFail)
+    assertEquals("results from leader epoch request should have undefined 
offset", expected, result)
+  }
+
+  /**
+    * If a partition doesn't have an epoch in the cache, this means it either
+    *   does not support epochs (log message format below 0.11) or it is a 
bootstrapping follower.
+    *
+    * In both cases, the partition has an undefined epoch
+    *   and there is no use to send the request, as we know the broker will 
answer with that epoch
+    */
+  @Test
+  def shouldNotSendEpochRequestIfLastEpochUndefinedForAllPartitions(): Unit = {
+    val props = TestUtils.createBrokerConfig(1, "localhost:1234")
+    props.put(KafkaConfig.InterBrokerProtocolVersionProp, "1.0.0")
+    val config = KafkaConfig.fromProps(props)
+    val leaderEndpoint = createMock(classOf[BlockingSend])
+
+    expect(leaderEndpoint.sendRequest(anyObject())).andAnswer(new 
IAnswer[ClientResponse] {
+      override def answer(): ClientResponse = {
+        toFail = true // assert no leader request is sent
+        createMock(classOf[ClientResponse])
+      }
+    })
+    replay(leaderEndpoint)
+    val thread = new ReplicaFetcherThread(
+      name = "bob",
+      fetcherId = 0,
+      sourceBroker = brokerEndPoint,
+      brokerConfig = config,
+      replicaMgr = null,
+      metrics =  new Metrics(),
+      time = new SystemTime(),
+      quota = null,
+      leaderEndpointBlockingSend = Some(leaderEndpoint))
+
+
+    val result = thread.fetchEpochsFromLeader(Map(t1p0 -> UNDEFINED_EPOCH, 
t1p1 -> UNDEFINED_EPOCH))
+
+    val expected = Map(
+      t1p0 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET),
+      t1p1 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
+    )
+
+    assertFalse("Leader request should not have been sent", toFail)
     assertEquals("results from leader epoch request should have undefined 
offset", expected, result)
   }
 
   @Test
+  def shouldFetchLeaderEpochRequestIfLastEpochDefinedForSomePartitions(): Unit 
= {
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
+
+    //Setup all dependencies
+    val quota = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val logManager = createMock(classOf[LogManager])
+    val replicaAlterLogDirsManager = 
createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica = createNiceMock(classOf[Replica])
+    val partition = createMock(classOf[Partition])
+    val replicaManager = createMock(classOf[ReplicaManager])
+
+    val leaderEpoch = 5
+
+    //Stubs
+    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+    expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes()
+    expect(replica.highWatermark).andReturn(new 
LogOffsetMetadata(0)).anyTimes()
+    expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch).once()
+    expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch).once()
+    expect(leaderEpochs.latestEpoch).andReturn(UNDEFINED_EPOCH).once()  // 
t2p1 doesnt support epochs
+    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, 
0)).anyTimes()
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
+    stub(replica, partition, replicaManager)
+
+    //Expectations
+    expect(partition.truncateTo(anyLong(), anyBoolean())).once
+
+    replay(leaderEpochs, replicaManager, logManager, quota, replica)
+
+    //Define the offsets for the OffsetsForLeaderEpochResponse
+    val offsets = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 1),
+      t1p1 -> new EpochEndOffset(leaderEpoch, 1),
+      t2p1 -> new EpochEndOffset(-1, 1)).asJava
+
+    //Create the fetcher thread
+    val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, 
brokerEndPoint, new SystemTime())
+
+    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, 
replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+
+    // topic 1 supports epoch, t2 doesn't
+    thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0, t2p1 -> 0))
+
+    assertPartitionStates(thread.partitionStates, shouldBeReadyForFetch = 
false, shouldBeTruncatingLog = true, shouldBeDelayed = false)
+    //Loop 1
+    thread.doWork()
+    assertEquals(1, mockNetwork.epochFetchCount)
+    assertEquals(1, mockNetwork.fetchCount)
+
+    assertPartitionStates(thread.partitionStates, shouldBeReadyForFetch = 
true, shouldBeTruncatingLog = false, shouldBeDelayed = false)
+
+    //Loop 2 we should not fetch epochs
+    thread.doWork()
+    assertEquals(1, mockNetwork.epochFetchCount)
+    assertEquals(2, mockNetwork.fetchCount)
+
+    assertPartitionStates(thread.partitionStates, shouldBeReadyForFetch = 
true, shouldBeTruncatingLog = false, shouldBeDelayed = false)
+
+    //Loop 3 we should not fetch epochs
+    thread.doWork()
+    assertEquals(1, mockNetwork.epochFetchCount)
+    assertEquals(3, mockNetwork.fetchCount)
+
+    assertPartitionStates(thread.partitionStates, shouldBeReadyForFetch = 
true, shouldBeTruncatingLog = false, shouldBeDelayed = false)
+
+    //Assert that truncate to is called exactly once (despite two loops)
+    verify(logManager)
+  }
+
+  /**
+    * Assert that all partitions' states are as expected
+    *
+    */
+  def assertPartitionStates(states: PartitionStates[PartitionFetchState], 
shouldBeReadyForFetch: Boolean,
+                            shouldBeTruncatingLog: Boolean, shouldBeDelayed: 
Boolean): Unit = {
+    for (tp <- List(t1p0, t1p1, t2p1)) {
+      assertEquals(
+        s"Partition $tp should${if (!shouldBeReadyForFetch) " NOT" else ""} be 
ready for fetching",
+        shouldBeReadyForFetch, states.stateValue(tp).isReadyForFetch)
+
+      assertEquals(
+        s"Partition $tp should${if (!shouldBeTruncatingLog) " NOT" else ""} be 
truncating its log",
+        shouldBeTruncatingLog,
+        states.stateValue(tp).isTruncatingLog)
+
+      assertEquals(
+        s"Partition $tp should${if (!shouldBeDelayed) " NOT" else ""} be 
delayed",
+        shouldBeDelayed,
+        states.stateValue(tp).isDelayed)
+    }
+  }
+
+  @Test
   def shouldHandleExceptionFromBlockingSend(): Unit = {
     val props = TestUtils.createBrokerConfig(1, "localhost:1234")
     val config = KafkaConfig.fromProps(props)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 171bcf3..41564a5 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -625,7 +625,7 @@ class ReplicaManagerTest {
     val mockBrokerTopicStats = new BrokerTopicStats
     val mockLogDirFailureChannel = new 
LogDirFailureChannel(config.logDirs.size)
     val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochCache])
-    
EasyMock.expect(mockLeaderEpochCache.latestEpoch()).andReturn(leaderEpochFromLeader)
+    
EasyMock.expect(mockLeaderEpochCache.latestEpoch).andReturn(leaderEpochFromLeader)
     EasyMock.expect(mockLeaderEpochCache.endOffsetFor(leaderEpochFromLeader))
       .andReturn((leaderEpochFromLeader, localLogOffset))
     EasyMock.replay(mockLeaderEpochCache)

Reply via email to