This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f3a4ebc KAFKA-6859; Do not send LeaderEpochRequest for undefined
leader epochs (#5320)
f3a4ebc is described below
commit f3a4ebc8f8be12884ff67a63c7ba5ce5c5f67881
Author: Stanislav Kozlovski <[email protected]>
AuthorDate: Fri Aug 31 03:04:28 2018 +0300
KAFKA-6859; Do not send LeaderEpochRequest for undefined leader epochs
(#5320)
If a broker or topic has a message format < 0.11, it does not track leader
epochs. LeaderEpochRequests for such will always return undefined, making the
follower truncate to the highe watermark. Since there is no use to use the
network for such cases, don't send a request.
Reviewers: Anna Povzner <[email protected]>, Jason Gustafson
<[email protected]>
---
core/src/main/scala/kafka/log/LogSegment.scala | 2 +-
.../kafka/server/ReplicaAlterLogDirsThread.scala | 2 +-
.../scala/kafka/server/ReplicaFetcherThread.scala | 23 +++-
.../kafka/server/epoch/LeaderEpochFileCache.scala | 2 +-
.../kafka/server/ReplicaFetcherThreadTest.scala | 152 ++++++++++++++++++++-
.../unit/kafka/server/ReplicaManagerTest.scala | 2 +-
6 files changed, 172 insertions(+), 11 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala
b/core/src/main/scala/kafka/log/LogSegment.scala
index 0b71670..0c00e55 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -358,7 +358,7 @@ class LogSegment private[log] (val log: FileRecords,
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
leaderEpochCache.foreach { cache =>
- if (batch.partitionLeaderEpoch > cache.latestEpoch()) // this is
to avoid unnecessary warning in cache.assign()
+ if (batch.partitionLeaderEpoch > cache.latestEpoch) // this is to
avoid unnecessary warning in cache.assign()
cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
}
updateProducerState(producerStateManager, batch)
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 08c4a17..05dc356 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -147,7 +147,7 @@ class ReplicaAlterLogDirsThread(name: String,
val (partitionsWithEpoch, partitionsWithoutEpoch) =
partitionEpochOpts.partition { case (_, epochCacheOpt) =>
epochCacheOpt.nonEmpty }
- val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp ->
epochCacheOpt.get.latestEpoch() }
+ val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp ->
epochCacheOpt.get.latestEpoch }
ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet)
}
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 56335a6..3b1a54f 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -95,7 +95,8 @@ class ReplicaFetcherThread(name: String,
private val minBytes = brokerConfig.replicaFetchMinBytes
private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
private val fetchSize = brokerConfig.replicaFetchMaxBytes
- private val shouldSendLeaderEpochRequest: Boolean =
brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2
+ private val brokerSupportsLeaderEpochRequest: Boolean =
brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2
+
private val fetchSessionHandler = new FetchSessionHandler(logContext,
sourceBroker.id)
private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochCache] =
replicaMgr.getReplica(tp).map(_.epochs.get)
@@ -346,19 +347,29 @@ class ReplicaFetcherThread(name: String,
val (partitionsWithEpoch, partitionsWithoutEpoch) =
partitionEpochOpts.partition { case (_, epochCacheOpt) =>
epochCacheOpt.nonEmpty }
debug(s"Build leaderEpoch request $partitionsWithEpoch")
- val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp ->
epochCacheOpt.get.latestEpoch() }
+ val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp ->
epochCacheOpt.get.latestEpoch }
ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet)
}
override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]):
Map[TopicPartition, EpochEndOffset] = {
var result: Map[TopicPartition, EpochEndOffset] = null
- if (shouldSendLeaderEpochRequest) {
- val partitionsAsJava = partitions.map { case (tp, epoch) => tp ->
epoch.asInstanceOf[Integer] }.toMap.asJava
+ if (brokerSupportsLeaderEpochRequest) {
+ // skip request for partitions without epoch, as their topic log message
format doesn't support epochs
+ val (partitionsWithEpoch, partitionsWithoutEpoch) = partitions.partition
{ case (_, epoch) => epoch != UNDEFINED_EPOCH }
+ val resultWithoutEpoch = partitionsWithoutEpoch.map { case (tp, _) =>
(tp, new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)) }
+
+ if (partitionsWithEpoch.isEmpty) {
+ debug("Skipping leaderEpoch request since all partitions do not have
an epoch")
+ return resultWithoutEpoch
+ }
+
+ val partitionsAsJava = partitionsWithEpoch.map { case (tp, epoch) => tp
-> epoch.asInstanceOf[Integer] }.toMap.asJava
val epochRequest = new
OffsetsForLeaderEpochRequest.Builder(offsetForLeaderEpochRequestVersion,
partitionsAsJava)
try {
val response = leaderEndpoint.sendRequest(epochRequest)
- result =
response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala
- debug(s"Receive leaderEpoch response $result")
+
+ result =
response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala
++ resultWithoutEpoch
+ debug(s"Receive leaderEpoch response $result; Skipped request for
partitions ${partitionsWithoutEpoch.keys}")
} catch {
case t: Throwable =>
warn(s"Error when sending leader epoch request for $partitions", t)
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index 23a5305..88f5d6b 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -28,7 +28,7 @@ import scala.collection.mutable.ListBuffer
trait LeaderEpochCache {
def assign(leaderEpoch: Int, offset: Long)
- def latestEpoch(): Int
+ def latestEpoch: Int
def endOffsetFor(epoch: Int): (Int, Long)
def clearAndFlushLatest(offset: Long)
def clearAndFlushEarliest(offset: Long)
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index fbf7740..9c759ce 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -23,7 +23,9 @@ import kafka.server.QuotaFactory.UnboundedQuota
import kafka.server.epoch.LeaderEpochCache
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
import kafka.utils.TestUtils
+import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.internals.PartitionStates
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.protocol.Errors._
@@ -31,7 +33,7 @@ import org.apache.kafka.common.requests.EpochEndOffset
import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.utils.SystemTime
import org.easymock.EasyMock._
-import org.easymock.{Capture, CaptureType}
+import org.easymock.{Capture, CaptureType, IAnswer}
import org.junit.Assert._
import org.junit.Test
@@ -44,6 +46,7 @@ class ReplicaFetcherThreadTest {
private val t1p1 = new TopicPartition("topic1", 1)
private val t2p1 = new TopicPartition("topic2", 1)
+ private var toFail = false
private val brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000)
@Test
@@ -71,6 +74,14 @@ class ReplicaFetcherThreadTest {
props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.10.2")
props.put(KafkaConfig.LogMessageFormatVersionProp, "0.10.2")
val config = KafkaConfig.fromProps(props)
+ val leaderEndpoint = createMock(classOf[BlockingSend])
+ expect(leaderEndpoint.sendRequest(anyObject())).andAnswer(new
IAnswer[ClientResponse] {
+ override def answer(): ClientResponse = {
+ toFail = true // assert no leader request is sent
+ createMock(classOf[ClientResponse])
+ }
+ })
+ replay(leaderEndpoint)
val thread = new ReplicaFetcherThread(
name = "bob",
fetcherId = 0,
@@ -89,10 +100,149 @@ class ReplicaFetcherThreadTest {
t1p1 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH,
UNDEFINED_EPOCH_OFFSET)
)
+ assertFalse("Leader request should not have been sent", toFail)
+ assertEquals("results from leader epoch request should have undefined
offset", expected, result)
+ }
+
+ /**
+ * If a partition doesn't have an epoch in the cache, this means it either
+ * does not support epochs (log message format below 0.11) or it is a
bootstrapping follower.
+ *
+ * In both cases, the partition has an undefined epoch
+ * and there is no use to send the request, as we know the broker will
answer with that epoch
+ */
+ @Test
+ def shouldNotSendEpochRequestIfLastEpochUndefinedForAllPartitions(): Unit = {
+ val props = TestUtils.createBrokerConfig(1, "localhost:1234")
+ props.put(KafkaConfig.InterBrokerProtocolVersionProp, "1.0.0")
+ val config = KafkaConfig.fromProps(props)
+ val leaderEndpoint = createMock(classOf[BlockingSend])
+
+ expect(leaderEndpoint.sendRequest(anyObject())).andAnswer(new
IAnswer[ClientResponse] {
+ override def answer(): ClientResponse = {
+ toFail = true // assert no leader request is sent
+ createMock(classOf[ClientResponse])
+ }
+ })
+ replay(leaderEndpoint)
+ val thread = new ReplicaFetcherThread(
+ name = "bob",
+ fetcherId = 0,
+ sourceBroker = brokerEndPoint,
+ brokerConfig = config,
+ replicaMgr = null,
+ metrics = new Metrics(),
+ time = new SystemTime(),
+ quota = null,
+ leaderEndpointBlockingSend = Some(leaderEndpoint))
+
+
+ val result = thread.fetchEpochsFromLeader(Map(t1p0 -> UNDEFINED_EPOCH,
t1p1 -> UNDEFINED_EPOCH))
+
+ val expected = Map(
+ t1p0 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH,
UNDEFINED_EPOCH_OFFSET),
+ t1p1 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH,
UNDEFINED_EPOCH_OFFSET)
+ )
+
+ assertFalse("Leader request should not have been sent", toFail)
assertEquals("results from leader epoch request should have undefined
offset", expected, result)
}
@Test
+ def shouldFetchLeaderEpochRequestIfLastEpochDefinedForSomePartitions(): Unit
= {
+ val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1,
"localhost:1234"))
+
+ //Setup all dependencies
+ val quota = createNiceMock(classOf[ReplicationQuotaManager])
+ val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+ val logManager = createMock(classOf[LogManager])
+ val replicaAlterLogDirsManager =
createMock(classOf[ReplicaAlterLogDirsManager])
+ val replica = createNiceMock(classOf[Replica])
+ val partition = createMock(classOf[Partition])
+ val replicaManager = createMock(classOf[ReplicaManager])
+
+ val leaderEpoch = 5
+
+ //Stubs
+ expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+ expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes()
+ expect(replica.highWatermark).andReturn(new
LogOffsetMetadata(0)).anyTimes()
+ expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch).once()
+ expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch).once()
+ expect(leaderEpochs.latestEpoch).andReturn(UNDEFINED_EPOCH).once() //
t2p1 doesnt support epochs
+ expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch,
0)).anyTimes()
+ expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
+ stub(replica, partition, replicaManager)
+
+ //Expectations
+ expect(partition.truncateTo(anyLong(), anyBoolean())).once
+
+ replay(leaderEpochs, replicaManager, logManager, quota, replica)
+
+ //Define the offsets for the OffsetsForLeaderEpochResponse
+ val offsets = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 1),
+ t1p1 -> new EpochEndOffset(leaderEpoch, 1),
+ t2p1 -> new EpochEndOffset(-1, 1)).asJava
+
+ //Create the fetcher thread
+ val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets,
brokerEndPoint, new SystemTime())
+
+ val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config,
replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+
+ // topic 1 supports epoch, t2 doesn't
+ thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0, t2p1 -> 0))
+
+ assertPartitionStates(thread.partitionStates, shouldBeReadyForFetch =
false, shouldBeTruncatingLog = true, shouldBeDelayed = false)
+ //Loop 1
+ thread.doWork()
+ assertEquals(1, mockNetwork.epochFetchCount)
+ assertEquals(1, mockNetwork.fetchCount)
+
+ assertPartitionStates(thread.partitionStates, shouldBeReadyForFetch =
true, shouldBeTruncatingLog = false, shouldBeDelayed = false)
+
+ //Loop 2 we should not fetch epochs
+ thread.doWork()
+ assertEquals(1, mockNetwork.epochFetchCount)
+ assertEquals(2, mockNetwork.fetchCount)
+
+ assertPartitionStates(thread.partitionStates, shouldBeReadyForFetch =
true, shouldBeTruncatingLog = false, shouldBeDelayed = false)
+
+ //Loop 3 we should not fetch epochs
+ thread.doWork()
+ assertEquals(1, mockNetwork.epochFetchCount)
+ assertEquals(3, mockNetwork.fetchCount)
+
+ assertPartitionStates(thread.partitionStates, shouldBeReadyForFetch =
true, shouldBeTruncatingLog = false, shouldBeDelayed = false)
+
+ //Assert that truncate to is called exactly once (despite two loops)
+ verify(logManager)
+ }
+
+ /**
+ * Assert that all partitions' states are as expected
+ *
+ */
+ def assertPartitionStates(states: PartitionStates[PartitionFetchState],
shouldBeReadyForFetch: Boolean,
+ shouldBeTruncatingLog: Boolean, shouldBeDelayed:
Boolean): Unit = {
+ for (tp <- List(t1p0, t1p1, t2p1)) {
+ assertEquals(
+ s"Partition $tp should${if (!shouldBeReadyForFetch) " NOT" else ""} be
ready for fetching",
+ shouldBeReadyForFetch, states.stateValue(tp).isReadyForFetch)
+
+ assertEquals(
+ s"Partition $tp should${if (!shouldBeTruncatingLog) " NOT" else ""} be
truncating its log",
+ shouldBeTruncatingLog,
+ states.stateValue(tp).isTruncatingLog)
+
+ assertEquals(
+ s"Partition $tp should${if (!shouldBeDelayed) " NOT" else ""} be
delayed",
+ shouldBeDelayed,
+ states.stateValue(tp).isDelayed)
+ }
+ }
+
+ @Test
def shouldHandleExceptionFromBlockingSend(): Unit = {
val props = TestUtils.createBrokerConfig(1, "localhost:1234")
val config = KafkaConfig.fromProps(props)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 171bcf3..41564a5 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -625,7 +625,7 @@ class ReplicaManagerTest {
val mockBrokerTopicStats = new BrokerTopicStats
val mockLogDirFailureChannel = new
LogDirFailureChannel(config.logDirs.size)
val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochCache])
-
EasyMock.expect(mockLeaderEpochCache.latestEpoch()).andReturn(leaderEpochFromLeader)
+
EasyMock.expect(mockLeaderEpochCache.latestEpoch).andReturn(leaderEpochFromLeader)
EasyMock.expect(mockLeaderEpochCache.endOffsetFor(leaderEpochFromLeader))
.andReturn((leaderEpochFromLeader, localLogOffset))
EasyMock.replay(mockLeaderEpochCache)