This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a531872 KAFKA-6857; Leader should reply with undefined offset if
undefined leader epoch requested (#4967)
a531872 is described below
commit a5318722c74ec0279a2e831e93e4164d9adcb8a1
Author: Anna Povzner <[email protected]>
AuthorDate: Thu May 3 23:06:34 2018 -0700
KAFKA-6857; Leader should reply with undefined offset if undefined leader
epoch requested (#4967)
The leader must explicitly check if requested leader epoch is undefined,
and return undefined offset so that the follower can fall back to truncating to
high watermark. Otherwise, if the leader also is not tracking leader epochs, it
may return its LEO, which will the follower to truncate to the incorrect offset.
---
.../scala/kafka/server/epoch/LeaderEpochFileCache.scala | 9 ++++++---
.../kafka/server/epoch/LeaderEpochFileCacheTest.scala | 17 ++++++++++++++++-
2 files changed, 22 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index eab0b9c..220432d 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -96,10 +96,13 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
leo: () => LogOffsetM
override def endOffsetFor(requestedEpoch: Int): Long = {
inReadLock(lock) {
val offset =
- if (requestedEpoch == latestEpoch) {
+ if (requestedEpoch == UNDEFINED_EPOCH) {
+ // this may happen if a bootstrapping follower sends a request with
undefined epoch or
+ // a follower is on the older message format where leader epochs are
not recorded
+ UNDEFINED_EPOCH_OFFSET
+ } else if (requestedEpoch == latestEpoch) {
leo().messageOffset
- }
- else {
+ } else {
val subsequentEpochs = epochs.filter(e => e.epoch > requestedEpoch)
if (subsequentEpochs.isEmpty || requestedEpoch < epochs.head.epoch)
UNDEFINED_EPOCH_OFFSET
diff --git
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
index 8460fe4..4a8df11 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
@@ -144,6 +144,21 @@ class LeaderEpochFileCacheTest {
}
@Test
+ def shouldReturnUnsupportedIfNoEpochRecordedAndUndefinedEpochRequested(){
+ val leo = 73
+ def leoFinder() = new LogOffsetMetadata(leo)
+
+ //Given
+ val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+ //When (say a follower on older message format version) sends request for
UNDEFINED_EPOCH
+ val offsetFor = cache.endOffsetFor(UNDEFINED_EPOCH)
+
+ //Then
+ assertEquals(UNDEFINED_EPOCH_OFFSET, offsetFor)
+ }
+
+ @Test
def shouldReturnUnsupportedIfRequestedEpochLessThanFirstEpoch(){
def leoFinder() = new LogOffsetMetadata(0)
@@ -664,4 +679,4 @@ class LeaderEpochFileCacheTest {
def setUp() {
checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile())
}
-}
\ No newline at end of file
+}
--
To stop receiving notification emails like this one, please contact
[email protected].