This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d152989  KAFKA-7897; Disable leader epoch cache when older message 
formats are used (#6232)
d152989 is described below

commit d152989f26f51b9004b881397db818ad6eaf0392
Author: Jason Gustafson <[email protected]>
AuthorDate: Fri Feb 8 09:22:13 2019 -0800

    KAFKA-7897; Disable leader epoch cache when older message formats are used 
(#6232)
    
    When an older message format is in use, we should disable the leader epoch 
cache so that we resort to truncation by high watermark. Previously we updated 
the cache for all versions when a broker became leader for a partition. This 
can cause large and unnecessary truncations after leader changes because we 
relied on the presence of _any_ cached epoch in order to tell whether to use 
the improved truncation logic possible with the OffsetsForLeaderEpoch API.
    
    Reviewers: Stanislav Kozlovski <[email protected]>, Viktor 
Somogyi-Vass <[email protected]>, Jun Rao <[email protected]>
---
 .../apache/kafka/common/record/RecordVersion.java  |   9 +
 .../requests/OffsetsForLeaderEpochRequest.java     |   4 +-
 core/src/main/scala/kafka/cluster/Partition.scala  |  10 +-
 core/src/main/scala/kafka/cluster/Replica.scala    |  21 ++-
 core/src/main/scala/kafka/log/Log.scala            |  98 ++++++++---
 core/src/main/scala/kafka/log/LogSegment.scala     |   2 +-
 .../scala/kafka/server/AbstractFetcherThread.scala | 102 +++++++-----
 .../kafka/server/ReplicaAlterLogDirsThread.scala   |  15 +-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |  69 +++-----
 .../checkpoints/LeaderEpochCheckpointFile.scala    |   8 +-
 .../kafka/server/epoch/LeaderEpochFileCache.scala  |  12 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  45 ++++-
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  89 +++++++++-
 .../kafka/server/AbstractFetcherThreadTest.scala   |  78 ++++++++-
 .../unit/kafka/server/ISRExpirationTest.scala      |   4 -
 .../server/ReplicaAlterLogDirsThreadTest.scala     |  62 +++----
 .../kafka/server/ReplicaFetcherThreadTest.scala    | 185 +++++----------------
 .../unit/kafka/server/ReplicaManagerTest.scala     |  13 +-
 ...chDrivenReplicationProtocolAcceptanceTest.scala |   2 +-
 .../server/epoch/LeaderEpochFileCacheTest.scala    |  16 +-
 .../server/epoch/OffsetsForLeaderEpochTest.scala   |  10 +-
 21 files changed, 496 insertions(+), 358 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/RecordVersion.java 
b/clients/src/main/java/org/apache/kafka/common/record/RecordVersion.java
index 1f80d62..8406d53 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordVersion.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordVersion.java
@@ -34,6 +34,15 @@ public enum RecordVersion {
         this.value = (byte) value;
     }
 
+    /**
+     * Check whether this version precedes another version.
+     *
+     * @return true only if the magic value is less than the other's
+     */
+    public boolean precedes(RecordVersion other) {
+        return this.value < other.value;
+    }
+
     public static RecordVersion lookup(byte value) {
         if (value < 0 || value >= VALUES.length)
             throw new IllegalArgumentException("Unknown record version: " + 
value);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index 57949a0..75788a0 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -98,8 +98,8 @@ public class OffsetsForLeaderEpochRequest extends 
AbstractRequest {
         @Override
         public String toString() {
             StringBuilder bld = new StringBuilder();
-            bld.append("(type=OffsetForLeaderEpochRequest, ").
-                    append("epochsByTopic=").append(epochsByPartition).
+            bld.append("OffsetsForLeaderEpochRequest(").
+                    append("epochsByPartition=").append(epochsByPartition).
                     append(")");
             return bld.toString();
         }
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index e731111..dccce9c 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -392,8 +392,8 @@ class Partition(val topicPartition: TopicPartition,
       // to ensure that these followers can truncate to the right offset, we 
must cache the new
       // leader epoch and the start offset since it should be larger than any 
epoch that a follower
       // would try to query.
-      leaderReplica.epochs.foreach { epochCache =>
-        epochCache.assign(leaderEpoch, leaderEpochStartOffset)
+      leaderReplica.log.foreach { log =>
+        log.maybeAssignEpochStartOffset(leaderEpoch, leaderEpochStartOffset)
       }
 
       val isNewLeader = !leaderReplicaIdOpt.contains(localBrokerId)
@@ -983,8 +983,10 @@ class Partition(val topicPartition: TopicPartition,
       val localReplicaOrError = getLocalReplica(localBrokerId, 
currentLeaderEpoch, fetchOnlyFromLeader)
       localReplicaOrError match {
         case Left(replica) =>
-          val (epoch, offset) = replica.epochs.get.endOffsetFor(leaderEpoch)
-          new EpochEndOffset(NONE, epoch, offset)
+          replica.endOffsetForEpoch(leaderEpoch) match {
+            case Some(epochAndOffset) => new EpochEndOffset(NONE, 
epochAndOffset.leaderEpoch, epochAndOffset.offset)
+            case None => new EpochEndOffset(NONE, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
+          }
         case Right(error) =>
           new EpochEndOffset(error, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
       }
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala 
b/core/src/main/scala/kafka/cluster/Replica.scala
index b8e831a..c66eb5c 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -17,10 +17,9 @@
 
 package kafka.cluster
 
-import kafka.server.epoch.LeaderEpochFileCache
 import kafka.log.{Log, LogOffsetSnapshot}
 import kafka.utils.Logging
-import kafka.server.{LogOffsetMetadata, LogReadResult}
+import kafka.server.{LogOffsetMetadata, LogReadResult, OffsetAndEpoch}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors.OffsetOutOfRangeException
 import org.apache.kafka.common.utils.Time
@@ -55,8 +54,6 @@ class Replica(val brokerId: Int,
 
   def lastCaughtUpTimeMs: Long = _lastCaughtUpTimeMs
 
-  val epochs: Option[LeaderEpochFileCache] = log.map(_.leaderEpochCache)
-
   info(s"Replica loaded for partition $topicPartition with initial high 
watermark $initialHighWatermarkValue")
   log.foreach(_.onHighWatermarkIncremented(initialHighWatermarkValue))
 
@@ -99,6 +96,22 @@ class Replica(val brokerId: Int,
     }
   }
 
+  def latestEpoch: Option[Int] = {
+    if (isLocal) {
+      log.get.latestEpoch
+    } else {
+      throw new KafkaException(s"Cannot get latest epoch of non-local replica 
of $topicPartition")
+    }
+  }
+
+  def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] = {
+    if (isLocal) {
+      log.get.endOffsetForEpoch(leaderEpoch)
+    } else {
+      throw new KafkaException(s"Cannot lookup end offset for epoch of 
non-local replica of $topicPartition")
+    }
+  }
+
   def logEndOffset: LogOffsetMetadata =
     if (isLocal)
       log.get.logEndOffsetMetadata
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index c448805..705ae75 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -32,15 +32,15 @@ import kafka.api.KAFKA_0_10_0_IV0
 import kafka.common.{LogSegmentOffsetOverflowException, LongRef, 
OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, 
NoCompressionCodec}
 import kafka.metrics.KafkaMetricsGroup
-import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
+import kafka.server.checkpoints.LeaderEpochCheckpointFile
 import kafka.server.epoch.LeaderEpochFileCache
-import kafka.server.{BrokerTopicStats, FetchDataInfo, LogDirFailureChannel, 
LogOffsetMetadata}
+import kafka.server.{BrokerTopicStats, FetchDataInfo, LogDirFailureChannel, 
LogOffsetMetadata, OffsetAndEpoch}
 import kafka.utils._
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
-import org.apache.kafka.common.requests.ListOffsetRequest
+import org.apache.kafka.common.requests.{EpochEndOffset, ListOffsetRequest}
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 
@@ -237,7 +237,15 @@ class Log(@volatile var dir: File,
       warn(s"${LogConfig.RetentionMsProp} for topic ${topicPartition.topic} is 
set to ${newConfig.retentionMs}. It is smaller than " +
         s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value 
${newConfig.messageTimestampDifferenceMaxMs}. " +
         s"This may result in frequent log rolling.")
+    val oldConfig = this.config
     this.config = newConfig
+    if (updatedKeys.contains(LogConfig.MessageFormatVersionProp)) {
+      val oldRecordVersion = oldConfig.messageFormatVersion.recordVersion
+      val newRecordVersion = newConfig.messageFormatVersion.recordVersion
+      if (newRecordVersion.precedes(oldRecordVersion))
+        warn(s"Record format version has been downgraded from 
$oldRecordVersion to $newRecordVersion.")
+      initializeLeaderEpochCache()
+    }
   }
 
   private def checkIfMemoryMappedBufferClosed(): Unit = {
@@ -270,22 +278,28 @@ class Log(@volatile var dir: File,
   /* the actual segments of the log */
   private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = 
new ConcurrentSkipListMap[java.lang.Long, LogSegment]
 
-  @volatile private var _leaderEpochCache: LeaderEpochFileCache = 
initializeLeaderEpochCache()
+  // Visible for testing
+  @volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None
 
   locally {
     val startMs = time.milliseconds
 
+    // create the log directory if it doesn't exist
+    Files.createDirectories(dir.toPath)
+
+    initializeLeaderEpochCache()
+
     val nextOffset = loadSegments()
 
     /* Calculate the offset of the next message */
     nextOffsetMetadata = new LogOffsetMetadata(nextOffset, 
activeSegment.baseOffset, activeSegment.size)
 
-    _leaderEpochCache.truncateFromEnd(nextOffsetMetadata.messageOffset)
+    
leaderEpochCache.foreach(_.truncateFromEnd(nextOffsetMetadata.messageOffset))
 
     logStartOffset = math.max(logStartOffset, 
segments.firstEntry.getValue.baseOffset)
 
     // The earliest leader epoch may not be flushed during a hard failure. 
Recover it here.
-    _leaderEpochCache.truncateFromStart(logStartOffset)
+    leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
 
     // Any segment loading or recovery code must not use producerStateManager, 
so that we can build the full state here
     // from scratch.
@@ -335,13 +349,30 @@ class Log(@volatile var dir: File,
   /** The name of this log */
   def name  = dir.getName()
 
-  def leaderEpochCache: LeaderEpochFileCache = _leaderEpochCache
+  def recordVersion: RecordVersion = config.messageFormatVersion.recordVersion
 
-  private def initializeLeaderEpochCache(): LeaderEpochFileCache = {
-    // create the log directory if it doesn't exist
-    Files.createDirectories(dir.toPath)
-    val checkpointFile = new 
LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir), logDirFailureChannel)
-    new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile)
+  private def initializeLeaderEpochCache(): Unit = lock synchronized {
+    val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)
+
+    def newLeaderEpochFileCache(): LeaderEpochFileCache = {
+      val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, 
logDirFailureChannel)
+      new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile)
+    }
+
+    if (recordVersion.precedes(RecordVersion.V2)) {
+      val currentCache = if (leaderEpochFile.exists())
+        Some(newLeaderEpochFileCache())
+      else
+        None
+
+      if (currentCache.exists(_.nonEmpty))
+        warn(s"Deleting non-empty leader epoch cache due to incompatible 
message format $recordVersion")
+
+      Files.deleteIfExists(leaderEpochFile.toPath)
+      leaderEpochCache = None
+    } else {
+      leaderEpochCache = Some(newLeaderEpochFileCache())
+    }
   }
 
   /**
@@ -580,7 +611,7 @@ class Log(@volatile var dir: File,
         info(s"Recovering unflushed segment ${segment.baseOffset}")
         val truncatedBytes =
           try {
-            recoverSegment(segment, Some(_leaderEpochCache))
+            recoverSegment(segment, leaderEpochCache)
           } catch {
             case _: InvalidOffsetException =>
               val startOffset = segment.baseOffset
@@ -734,7 +765,7 @@ class Log(@volatile var dir: File,
           producerStateManager.logDir = dir
           // re-initialize leader epoch cache so that 
LeaderEpochCheckpointFile.checkpoint can correctly reference
           // the checkpoint file in renamed log directory
-          _leaderEpochCache = initializeLeaderEpochCache()
+          initializeLeaderEpochCache()
         }
       }
     }
@@ -875,7 +906,7 @@ class Log(@volatile var dir: File,
         // update the epoch cache with the epoch stamped onto the message by 
the leader
         validRecords.batches.asScala.foreach { batch =>
           if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
-            _leaderEpochCache.assign(batch.partitionLeaderEpoch, 
batch.baseOffset)
+            maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, 
batch.baseOffset)
         }
 
         // check messages set size may be exceed config.segmentSize
@@ -944,6 +975,24 @@ class Log(@volatile var dir: File,
     }
   }
 
+  def maybeAssignEpochStartOffset(leaderEpoch: Int, startOffset: Long): Unit = 
{
+    leaderEpochCache.foreach { cache =>
+      cache.assign(leaderEpoch, startOffset)
+    }
+  }
+
+  def latestEpoch: Option[Int] = leaderEpochCache.flatMap(_.latestEpoch)
+
+  def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] = {
+    leaderEpochCache.flatMap { cache =>
+      val (foundEpoch, foundOffset) = cache.endOffsetFor(leaderEpoch)
+      if (foundOffset == EpochEndOffset.UNDEFINED_EPOCH_OFFSET)
+        None
+      else
+        Some(OffsetAndEpoch(foundOffset, foundEpoch))
+    }
+  }
+
   def onHighWatermarkIncremented(highWatermark: Long): Unit = {
     lock synchronized {
       replicaHighWatermark = Some(highWatermark)
@@ -982,7 +1031,7 @@ class Log(@volatile var dir: File,
         if (newLogStartOffset > logStartOffset) {
           info(s"Incrementing log start offset to $newLogStartOffset")
           logStartOffset = newLogStartOffset
-          _leaderEpochCache.truncateFromStart(logStartOffset)
+          leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
           producerStateManager.truncateHead(logStartOffset)
           updateFirstUnstableOffset()
         }
@@ -1284,19 +1333,16 @@ class Log(@volatile var dir: File,
         // The first cached epoch usually corresponds to the log start offset, 
but we have to verify this since
         // it may not be true following a message format version bump as the 
epoch will not be available for
         // log entries written in the older format.
-        val earliestEpochEntry = leaderEpochCache.earliestEntry
+        val earliestEpochEntry = leaderEpochCache.flatMap(_.earliestEntry)
         val epochOpt = earliestEpochEntry match {
           case Some(entry) if entry.startOffset <= logStartOffset => 
Optional.of[Integer](entry.epoch)
           case _ => Optional.empty[Integer]()
         }
         return Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
logStartOffset, epochOpt))
       } else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP) {
-        val latestEpoch = leaderEpochCache.latestEpoch
-        val epochOpt = if (latestEpoch == 
RecordBatch.NO_PARTITION_LEADER_EPOCH)
-          Optional.empty[Integer]()
-        else
-          Optional.of[Integer](latestEpoch)
-        return Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
logEndOffset, epochOpt))
+        val latestEpochOpt = 
leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
+        val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
+        return Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
logEndOffset, epochOptional))
       }
 
       val targetSeg = {
@@ -1711,7 +1757,7 @@ class Log(@volatile var dir: File,
         removeLogMetrics()
         logSegments.foreach(_.deleteIfExists())
         segments.clear()
-        _leaderEpochCache.clear()
+        leaderEpochCache.foreach(_.clear())
         Utils.delete(dir)
         // File handlers will be closed if this log is deleted
         isMemoryMappedBufferClosed = true
@@ -1766,7 +1812,7 @@ class Log(@volatile var dir: File,
             updateLogEndOffset(targetOffset)
             this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
             this.logStartOffset = math.min(targetOffset, this.logStartOffset)
-            _leaderEpochCache.truncateFromEnd(targetOffset)
+            leaderEpochCache.foreach(_.truncateFromEnd(targetOffset))
             loadProducerState(targetOffset, reloadFromCleanShutdown = false)
           }
           true
@@ -1795,7 +1841,7 @@ class Log(@volatile var dir: File,
           initFileSize = initFileSize,
           preallocate = config.preallocate))
         updateLogEndOffset(newOffset)
-        _leaderEpochCache.clearAndFlush()
+        leaderEpochCache.foreach(_.clearAndFlush())
 
         producerStateManager.truncate()
         producerStateManager.updateMapEndOffset(newOffset)
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala 
b/core/src/main/scala/kafka/log/LogSegment.scala
index a700aeb..13093b4 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -358,7 +358,7 @@ class LogSegment private[log] (val log: FileRecords,
 
         if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
           leaderEpochCache.foreach { cache =>
-            if (batch.partitionLeaderEpoch > cache.latestEpoch) // this is to 
avoid unnecessary warning in cache.assign()
+            if (batch.partitionLeaderEpoch > 0 && 
cache.latestEpoch.forall(batch.partitionLeaderEpoch > _))
               cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
           }
           updateProducerState(producerStateManager, batch)
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index a271706..7f7ed1f 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -86,7 +86,7 @@ abstract class AbstractFetcherThread(name: String,
 
   protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): 
Option[OffsetAndEpoch]
 
-  protected def fetchEpochsFromLeader(partitions: Map[TopicPartition, 
EpochData]): Map[TopicPartition, EpochEndOffset]
+  protected def fetchEpochEndOffsets(partitions: Map[TopicPartition, 
EpochData]): Map[TopicPartition, EpochEndOffset]
 
   protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): 
Seq[(TopicPartition, FetchData)]
 
@@ -94,6 +94,8 @@ abstract class AbstractFetcherThread(name: String,
 
   protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, 
currentLeaderEpoch: Int): Long
 
+  protected def isOffsetForLeaderEpochSupported: Boolean
+
   override def shutdown() {
     initiateShutdown()
     inLock(partitionMapLock) {
@@ -141,27 +143,35 @@ abstract class AbstractFetcherThread(name: String,
    * Builds offset for leader epoch requests for partitions that are in the 
truncating phase based
    * on latest epochs of the future replicas (the one that is fetching)
    */
-  private def buildLeaderEpochRequest(): 
ResultWithPartitions[Map[TopicPartition, EpochData]] = inLock(partitionMapLock) 
{
-    val partitionsWithoutEpochs = mutable.Set.empty[TopicPartition]
+  private def fetchTruncatingPartitions(): (Map[TopicPartition, EpochData], 
Set[TopicPartition]) = inLock(partitionMapLock) {
     val partitionsWithEpochs = mutable.Map.empty[TopicPartition, EpochData]
+    val partitionsWithoutEpochs = mutable.Set.empty[TopicPartition]
 
     partitionStates.stream().forEach(new 
Consumer[PartitionStates.PartitionState[PartitionFetchState]] {
       override def accept(state: 
PartitionStates.PartitionState[PartitionFetchState]): Unit = {
-        val tp = state.topicPartition
         if (state.value.isTruncating) {
+          val tp = state.topicPartition
           latestEpoch(tp) match {
-            case Some(latestEpoch) =>
-              val partitionData = new 
EpochData(Optional.of(state.value.currentLeaderEpoch), latestEpoch)
-              partitionsWithEpochs += tp -> partitionData
-            case None =>
+            case Some(epoch) if isOffsetForLeaderEpochSupported =>
+              partitionsWithEpochs += tp -> new 
EpochData(Optional.of(state.value.currentLeaderEpoch), epoch)
+            case _ =>
               partitionsWithoutEpochs += tp
           }
         }
       }
     })
 
-    debug(s"Build leaderEpoch request $partitionsWithEpochs")
-    ResultWithPartitions(partitionsWithEpochs, partitionsWithoutEpochs)
+    (partitionsWithEpochs, partitionsWithoutEpochs)
+  }
+
+  private def maybeTruncate(): Unit = {
+    val (partitionsWithEpochs, partitionsWithoutEpochs) = 
fetchTruncatingPartitions()
+    if (partitionsWithEpochs.nonEmpty) {
+      truncateToEpochEndOffsets(partitionsWithEpochs)
+    }
+    if (partitionsWithoutEpochs.nonEmpty) {
+      truncateToHighWatermark(partitionsWithoutEpochs)
+    }
   }
 
   /**
@@ -173,33 +183,52 @@ abstract class AbstractFetcherThread(name: String,
     *   truncation complete. Do this within a lock to ensure no leadership 
changes can
     *   occur during truncation.
     */
-  private def maybeTruncate(): Unit = {
-    val ResultWithPartitions(epochRequests, partitionsWithError) = 
buildLeaderEpochRequest()
-    handlePartitionsWithErrors(partitionsWithError)
-
-    if (epochRequests.nonEmpty) {
-      val fetchedEpochs = fetchEpochsFromLeader(epochRequests)
-      //Ensure we hold a lock during truncation.
-      inLock(partitionMapLock) {
-        //Check no leadership and no leader epoch changes happened whilst we 
were unlocked, fetching epochs
-        val leaderEpochs = fetchedEpochs.filter { case (tp, _) =>
-          val curPartitionState = partitionStates.stateValue(tp)
-          val partitionEpochRequest = epochRequests.get(tp).getOrElse {
-            throw new IllegalStateException(
-              s"Leader replied with partition $tp not requested in 
OffsetsForLeaderEpoch request")
-          }
-          val leaderEpochInRequest = 
partitionEpochRequest.currentLeaderEpoch.get
-          curPartitionState != null && leaderEpochInRequest == 
curPartitionState.currentLeaderEpoch
+  private def truncateToEpochEndOffsets(latestEpochsForPartitions: 
Map[TopicPartition, EpochData]): Unit = {
+    val endOffsets = fetchEpochEndOffsets(latestEpochsForPartitions)
+    //Ensure we hold a lock during truncation.
+    inLock(partitionMapLock) {
+      //Check no leadership and no leader epoch changes happened whilst we 
were unlocked, fetching epochs
+      val epochEndOffsets = endOffsets.filter { case (tp, _) =>
+        val curPartitionState = partitionStates.stateValue(tp)
+        val partitionEpochRequest = 
latestEpochsForPartitions.get(tp).getOrElse {
+          throw new IllegalStateException(
+            s"Leader replied with partition $tp not requested in 
OffsetsForLeaderEpoch request")
         }
+        val leaderEpochInRequest = partitionEpochRequest.currentLeaderEpoch.get
+        curPartitionState != null && leaderEpochInRequest == 
curPartitionState.currentLeaderEpoch
+      }
 
-        val ResultWithPartitions(fetchOffsets, partitionsWithError) = 
maybeTruncate(leaderEpochs)
-        handlePartitionsWithErrors(partitionsWithError)
-        updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
+      val ResultWithPartitions(fetchOffsets, partitionsWithError) = 
maybeTruncateToEpochEndOffsets(epochEndOffsets)
+      handlePartitionsWithErrors(partitionsWithError)
+      updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
+    }
+  }
+
+  private def truncateToHighWatermark(partitions: Set[TopicPartition]): Unit = 
inLock(partitionMapLock) {
+    val fetchOffsets = mutable.HashMap.empty[TopicPartition, 
OffsetTruncationState]
+    val partitionsWithError = mutable.HashSet.empty[TopicPartition]
+
+    for (tp <- partitions) {
+      try {
+        val highWatermark = partitionStates.stateValue(tp).fetchOffset
+        val truncationState = OffsetTruncationState(highWatermark, 
truncationCompleted = true)
+
+        info(s"Truncating partition $tp to local high watermark 
$highWatermark")
+        truncate(tp, truncationState)
+
+        fetchOffsets.put(tp, truncationState)
+      } catch {
+        case e: KafkaStorageException =>
+          info(s"Failed to truncate $tp", e)
+          partitionsWithError += tp
       }
     }
+
+    handlePartitionsWithErrors(partitionsWithError)
+    updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
   }
 
-  private def maybeTruncate(fetchedEpochs: Map[TopicPartition, 
EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, 
OffsetTruncationState]] = {
+  private def maybeTruncateToEpochEndOffsets(fetchedEpochs: 
Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, 
OffsetTruncationState]] = {
     val fetchOffsets = mutable.HashMap.empty[TopicPartition, 
OffsetTruncationState]
     val partitionsWithError = mutable.HashSet.empty[TopicPartition]
 
@@ -207,7 +236,8 @@ abstract class AbstractFetcherThread(name: String,
       try {
         leaderEpochOffset.error match {
           case Errors.NONE =>
-            val offsetTruncationState = truncate(tp, leaderEpochOffset)
+            val offsetTruncationState = getOffsetTruncationState(tp, 
leaderEpochOffset)
+            truncate(tp, offsetTruncationState)
             fetchOffsets.put(tp, offsetTruncationState)
 
           case Errors.FENCED_LEADER_EPOCH =>
@@ -236,12 +266,6 @@ abstract class AbstractFetcherThread(name: String,
     }
   }
 
-  private def truncate(topicPartition: TopicPartition, epochEndOffset: 
EpochEndOffset): OffsetTruncationState = {
-    val offsetTruncationState = getOffsetTruncationState(topicPartition, 
epochEndOffset)
-    truncate(topicPartition, offsetTruncationState)
-    offsetTruncationState
-  }
-
   private def processFetchRequest(fetchStates: Map[TopicPartition, 
PartitionFetchState],
                                   fetchRequest: FetchRequest.Builder): Unit = {
     val partitionsWithError = mutable.Set[TopicPartition]()
@@ -525,7 +549,7 @@ abstract class AbstractFetcherThread(name: String,
     if (leaderEndOffset < replicaEndOffset) {
       warn(s"Reset fetch offset for partition $topicPartition from 
$replicaEndOffset to current " +
         s"leader's latest offset $leaderEndOffset")
-      truncate(topicPartition, new EpochEndOffset(Errors.NONE, 
UNDEFINED_EPOCH, leaderEndOffset))
+      truncate(topicPartition, OffsetTruncationState(leaderEndOffset, 
truncationCompleted = true))
       leaderEndOffset
     } else {
       /**
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala 
b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 7761f35..54bb2a2 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -53,7 +53,7 @@ class ReplicaAlterLogDirsThread(name: String,
   private val fetchSize = brokerConfig.replicaFetchMaxBytes
 
   override protected def latestEpoch(topicPartition: TopicPartition): 
Option[Int] = {
-    
replicaMgr.futureLocalReplicaOrException(topicPartition).epochs.map(_.latestEpoch)
+    replicaMgr.futureLocalReplicaOrException(topicPartition).latestEpoch
   }
 
   override protected def logEndOffset(topicPartition: TopicPartition): Long = {
@@ -61,14 +61,7 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 
   override protected def endOffsetForEpoch(topicPartition: TopicPartition, 
epoch: Int): Option[OffsetAndEpoch] = {
-    val replica = replicaMgr.futureLocalReplicaOrException(topicPartition)
-    replica.epochs.flatMap { epochCache =>
-      val (foundEpoch, foundOffset) = epochCache.endOffsetFor(epoch)
-      if (foundOffset == UNDEFINED_EPOCH_OFFSET)
-        None
-      else
-        Some(OffsetAndEpoch(foundOffset, foundEpoch))
-    }
+    
replicaMgr.futureLocalReplicaOrException(topicPartition).endOffsetForEpoch(epoch)
   }
 
   def fetchFromLeader(fetchRequest: FetchRequest.Builder): 
Seq[(TopicPartition, FetchData)] = {
@@ -145,7 +138,7 @@ class ReplicaAlterLogDirsThread(name: String,
    * @param partitions map of topic partition -> leader epoch of the future 
replica
    * @return map of topic partition -> end offset for a requested leader epoch
    */
-  override def fetchEpochsFromLeader(partitions: Map[TopicPartition, 
EpochData]): Map[TopicPartition, EpochEndOffset] = {
+  override def fetchEpochEndOffsets(partitions: Map[TopicPartition, 
EpochData]): Map[TopicPartition, EpochEndOffset] = {
     partitions.map { case (tp, epochData) =>
       try {
         val endOffset = if (epochData.leaderEpoch == UNDEFINED_EPOCH) {
@@ -166,6 +159,8 @@ class ReplicaAlterLogDirsThread(name: String,
     }
   }
 
+  override protected def isOffsetForLeaderEpochSupported: Boolean = true
+
   /**
    * Truncate the log for each partition based on current replica's returned 
epoch and offset.
    *
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 66b81e5..64ae71e 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -95,7 +95,7 @@ class ReplicaFetcherThread(name: String,
   private val fetchSessionHandler = new FetchSessionHandler(logContext, 
sourceBroker.id)
 
   override protected def latestEpoch(topicPartition: TopicPartition): 
Option[Int] = {
-    
replicaMgr.localReplicaOrException(topicPartition).epochs.map(_.latestEpoch)
+    replicaMgr.localReplicaOrException(topicPartition).latestEpoch
   }
 
   override protected def logEndOffset(topicPartition: TopicPartition): Long = {
@@ -103,14 +103,7 @@ class ReplicaFetcherThread(name: String,
   }
 
   override protected def endOffsetForEpoch(topicPartition: TopicPartition, 
epoch: Int): Option[OffsetAndEpoch] = {
-    val replica = replicaMgr.localReplicaOrException(topicPartition)
-    replica.epochs.flatMap { epochCache =>
-      val (foundEpoch, foundOffset) = epochCache.endOffsetFor(epoch)
-      if (foundOffset == UNDEFINED_EPOCH_OFFSET)
-        None
-      else
-        Some(OffsetAndEpoch(foundOffset, foundEpoch))
-    }
+    replicaMgr.localReplicaOrException(topicPartition).endOffsetForEpoch(epoch)
   }
 
   override def initiateShutdown(): Boolean = {
@@ -299,48 +292,36 @@ class ReplicaFetcherThread(name: String,
     partition.truncateFullyAndStartAt(offset, isFuture = false)
   }
 
-  override def fetchEpochsFromLeader(partitions: Map[TopicPartition, 
EpochData]): Map[TopicPartition, EpochEndOffset] = {
-    def undefinedResponseMap(error: Errors,
-                             requestMap: Map[TopicPartition, EpochData]): 
Map[TopicPartition, EpochEndOffset] = {
-      requestMap.map { case (tp, _) =>
-        tp -> new EpochEndOffset(error, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
-      }
+  override def fetchEpochEndOffsets(partitions: Map[TopicPartition, 
EpochData]): Map[TopicPartition, EpochEndOffset] = {
+
+    if (partitions.isEmpty) {
+      debug("Skipping leaderEpoch request since all partitions do not have an 
epoch")
+      return Map.empty
     }
 
-    if (brokerSupportsLeaderEpochRequest) {
-      // skip request for partitions without epoch, as their topic log message 
format doesn't support epochs
-      val (partitionsWithEpoch, partitionsWithoutEpoch) = partitions.partition 
{ case (_, epochData) =>
-        epochData.leaderEpoch != UNDEFINED_EPOCH
-      }
-      val resultWithoutEpoch = undefinedResponseMap(Errors.NONE, 
partitionsWithoutEpoch)
-      if (partitionsWithEpoch.isEmpty) {
-        debug("Skipping leaderEpoch request since all partitions do not have 
an epoch")
-        return resultWithoutEpoch
-      }
+    val epochRequest = new 
OffsetsForLeaderEpochRequest.Builder(offsetForLeaderEpochRequestVersion, 
partitions.asJava)
+    debug(s"Sending offset for leader epoch request $epochRequest")
 
-      val epochRequest = new 
OffsetsForLeaderEpochRequest.Builder(offsetForLeaderEpochRequestVersion,
-        partitionsWithEpoch.asJava)
-      val resultWithEpoch = try {
-        val response = leaderEndpoint.sendRequest(epochRequest)
-        val responseBody = 
response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse]
-        debug(s"Receive leaderEpoch response $response; " +
-          s"Skipped request for partitions ${partitionsWithoutEpoch.keys}")
-        responseBody.responses.asScala
-      } catch {
-        case t: Throwable =>
-          warn(s"Error when sending leader epoch request for $partitions", t)
+    try {
+      val response = leaderEndpoint.sendRequest(epochRequest)
+      val responseBody = 
response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse]
+      debug(s"Received leaderEpoch response $response")
+      responseBody.responses.asScala
+    } catch {
+      case t: Throwable =>
+        warn(s"Error when sending leader epoch request for $partitions", t)
 
-          // if we get any unexpected exception, mark all partitions with an 
error
-          undefinedResponseMap(Errors.forException(t), partitionsWithEpoch)
-      }
-      resultWithEpoch ++ resultWithoutEpoch
-    } else {
-      // just generate a response with no error but UNDEFINED_OFFSET so that 
we can fall back to truncating using
-      // high watermark in maybeTruncate()
-      undefinedResponseMap(Errors.NONE, partitions)
+        // if we get any unexpected exception, mark all partitions with an 
error
+        val error = Errors.forException(t)
+        partitions.map { case (tp, _) =>
+          tp -> new EpochEndOffset(error, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
+        }
     }
   }
 
+  override def isOffsetForLeaderEpochSupported: Boolean = 
brokerSupportsLeaderEpochRequest
+
+
   /**
    *  To avoid ISR thrashing, we only throttle a replica on the follower if 
it's in the throttled replica list,
    *  the quota is exceeded and the replica is not in sync.
diff --git 
a/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala 
b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
index a8db688..79a11c7 100644
--- 
a/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
+++ 
b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
@@ -29,15 +29,13 @@ trait LeaderEpochCheckpoint {
   def read(): Seq[EpochEntry]
 }
 
-object LeaderEpochFile {
-  private val LeaderEpochCheckpointFilename = "leader-epoch-checkpoint"
-  def newFile(dir: File): File = new File(dir, LeaderEpochCheckpointFilename)
-}
-
 object LeaderEpochCheckpointFile {
+  private val LeaderEpochCheckpointFilename = "leader-epoch-checkpoint"
   private val WhiteSpacesPattern = Pattern.compile("\\s+")
   private val CurrentVersion = 0
 
+  def newFile(dir: File): File = new File(dir, LeaderEpochCheckpointFilename)
+
   object Formatter extends CheckpointFileFormatter[EpochEntry] {
 
     override def toLine(entry: EpochEntry): String = s"${entry.epoch} 
${entry.startOffset}"
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala 
b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index f47d3bd..0c885b7 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -84,13 +84,17 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
     }
   }
 
+  def nonEmpty: Boolean = inReadLock(lock) {
+    epochs.nonEmpty
+  }
+
   /**
-   * Returns the current Leader Epoch. This is the latest epoch
+   * Returns the current Leader Epoch if one exists. This is the latest epoch
    * which has messages assigned to it.
    */
-  def latestEpoch: Int = {
+  def latestEpoch: Option[Int] = {
     inReadLock(lock) {
-      if (epochs.isEmpty) UNDEFINED_EPOCH else epochs.last.epoch
+      epochs.lastOption.map(_.epoch)
     }
   }
 
@@ -125,7 +129,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
           // This may happen if a bootstrapping follower sends a request with 
undefined epoch or
           // a follower is on the older message format where leader epochs are 
not recorded
           (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
-        } else if (requestedEpoch == latestEpoch) {
+        } else if (latestEpoch.contains(requestedEpoch)) {
           // For the leader, the latest epoch is always the current leader 
epoch that is still being written to.
           // Followers should not have any reason to query for the end offset 
of the current epoch, but a consumer
           // might if it is verifying its committed offset following a group 
rebalance. In this case, we return
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index d6effad..c58532d 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -35,7 +35,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.{IsolationLevel, LeaderAndIsrRequest, 
ListOffsetRequest}
+import org.apache.kafka.common.requests.{EpochEndOffset, IsolationLevel, 
LeaderAndIsrRequest, ListOffsetRequest}
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
 import org.scalatest.Assertions.assertThrows
@@ -60,10 +60,7 @@ class PartitionTest {
 
   @Before
   def setup(): Unit = {
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 512: java.lang.Integer)
-    logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
-    logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer)
+    val logProps = createLogProperties(Map.empty)
     logConfig = LogConfig(logProps)
 
     tmpDir = TestUtils.tempDir()
@@ -88,6 +85,15 @@ class PartitionTest {
     EasyMock.replay(kafkaZkClient)
   }
 
+  private def createLogProperties(overrides: Map[String, String]): Properties 
= {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 512: java.lang.Integer)
+    logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
+    logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer)
+    overrides.foreach { case (k, v) => logProps.put(k, v) }
+    logProps
+  }
+
   @After
   def tearDown(): Unit = {
     brokerTopicStats.close()
@@ -124,6 +130,35 @@ class PartitionTest {
   }
 
   @Test
+  def testMakeLeaderDoesNotUpdateEpochCacheForOldFormats(): Unit = {
+    val leaderEpoch = 8
+
+    val logConfig = LogConfig(createLogProperties(Map(
+      LogConfig.MessageFormatVersionProp -> 
kafka.api.KAFKA_0_10_2_IV0.shortVersion)))
+    val log = logManager.getOrCreateLog(topicPartition, logConfig)
+    log.appendAsLeader(TestUtils.records(List(
+      new SimpleRecord("k1".getBytes, "v1".getBytes),
+      new SimpleRecord("k2".getBytes, "v2".getBytes)),
+      magicValue = RecordVersion.V1.value
+    ), leaderEpoch = 0)
+    log.appendAsLeader(TestUtils.records(List(
+      new SimpleRecord("k3".getBytes, "v3".getBytes),
+      new SimpleRecord("k4".getBytes, "v4".getBytes)),
+      magicValue = RecordVersion.V1.value
+    ), leaderEpoch = 5)
+    assertEquals(4, log.logEndOffset)
+
+    val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch, 
isLeader = true, log = log)
+    assertEquals(Some(4), 
partition.leaderReplicaIfLocal.map(_.logEndOffset.messageOffset))
+    assertEquals(None, log.latestEpoch)
+
+    val epochEndOffset = partition.lastOffsetForLeaderEpoch(currentLeaderEpoch 
= Optional.of[Integer](leaderEpoch),
+      leaderEpoch = leaderEpoch, fetchOnlyFromLeader = true)
+    assertEquals(EpochEndOffset.UNDEFINED_EPOCH_OFFSET, 
epochEndOffset.endOffset)
+    assertEquals(EpochEndOffset.UNDEFINED_EPOCH, epochEndOffset.leaderEpoch)
+  }
+
+  @Test
   // Verify that partition.removeFutureLocalReplica() and 
partition.maybeReplaceCurrentWithFutureReplica() can run concurrently
   def testMaybeReplaceCurrentWithFutureReplica(): Unit = {
     val latch = new CountDownLatch(1)
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index b4dc807..163001f 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -25,6 +25,7 @@ import java.util.{Optional, Properties}
 import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
 import kafka.common.{OffsetsOutOfOrderException, 
UnexpectedAppendOffsetException}
 import kafka.log.Log.DeleteDirSuffix
+import kafka.server.checkpoints.LeaderEpochCheckpointFile
 import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
 import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, 
LogDirFailureChannel}
 import kafka.utils._
@@ -1735,7 +1736,7 @@ class LogTest {
 
     // The cache can be updated directly after a leader change.
     // The new latest offset should reflect the updated epoch.
-    log.leaderEpochCache.assign(2, 2L)
+    log.maybeAssignEpochStartOffset(2, 2L)
 
     assertEquals(Some(new 
TimestampAndOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))),
       log.fetchOffsetByTimestamp(ListOffsetRequest.LATEST_TIMESTAMP))
@@ -2159,6 +2160,80 @@ class LogTest {
   }
 
   @Test
+  def testWriteLeaderEpochCheckpointAfterDirectoryRename(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
+    val log = createLog(logDir, logConfig)
+    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
+    assertEquals(Some(5), log.latestEpoch)
+
+    // Ensure that after a directory rename, the epoch cache is written to the 
right location
+    val tp = Log.parseTopicPartitionName(log.dir)
+    log.renameDir(Log.logDeleteDirName(tp))
+    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("foo".getBytes()))), leaderEpoch = 10)
+    assertEquals(Some(10), log.latestEpoch)
+    assertTrue(LeaderEpochCheckpointFile.newFile(log.dir).exists())
+    assertFalse(LeaderEpochCheckpointFile.newFile(this.logDir).exists())
+  }
+
+  @Test
+  def testLeaderEpochCacheClearedAfterStaticMessageFormatDowngrade(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
+    val log = createLog(logDir, logConfig)
+    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
+    assertEquals(Some(5), log.latestEpoch)
+    log.close()
+
+    // reopen the log with an older message format version and check the cache
+    val downgradedLogConfig = LogTest.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1,
+      maxMessageBytes = 64 * 1024, messageFormatVersion = 
kafka.api.KAFKA_0_10_2_IV0.shortVersion)
+    val reopened = createLog(logDir, downgradedLogConfig)
+    assertLeaderEpochCacheEmpty(reopened)
+
+    reopened.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("bar".getBytes())),
+      magicValue = RecordVersion.V1.value), leaderEpoch = 5)
+    assertLeaderEpochCacheEmpty(reopened)
+  }
+
+  @Test
+  def testLeaderEpochCacheClearedAfterDynamicMessageFormatDowngrade(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
+    val log = createLog(logDir, logConfig)
+    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
+    assertEquals(Some(5), log.latestEpoch)
+
+    val downgradedLogConfig = LogTest.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1,
+      maxMessageBytes = 64 * 1024, messageFormatVersion = 
kafka.api.KAFKA_0_10_2_IV0.shortVersion)
+    log.updateConfig(Set(LogConfig.MessageFormatVersionProp), 
downgradedLogConfig)
+    assertLeaderEpochCacheEmpty(log)
+
+    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("bar".getBytes())),
+      magicValue = RecordVersion.V1.value), leaderEpoch = 5)
+    assertLeaderEpochCacheEmpty(log)
+  }
+
+  @Test
+  def testLeaderEpochCacheCreatedAfterMessageFormatUpgrade(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1,
+      maxMessageBytes = 64 * 1024, messageFormatVersion = 
kafka.api.KAFKA_0_10_2_IV0.shortVersion)
+    val log = createLog(logDir, logConfig)
+    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("bar".getBytes())),
+      magicValue = RecordVersion.V1.value), leaderEpoch = 5)
+    assertLeaderEpochCacheEmpty(log)
+
+    val upgradedLogConfig = LogTest.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1,
+      maxMessageBytes = 64 * 1024, messageFormatVersion = 
kafka.api.KAFKA_0_11_0_IV0.shortVersion)
+    log.updateConfig(Set(LogConfig.MessageFormatVersionProp), 
upgradedLogConfig)
+    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
+    assertEquals(Some(5), log.latestEpoch)
+  }
+
+  private def assertLeaderEpochCacheEmpty(log: Log): Unit = {
+    assertEquals(None, log.leaderEpochCache)
+    assertEquals(None, log.latestEpoch)
+    assertFalse(LeaderEpochCheckpointFile.newFile(log.dir).exists())
+  }
+
+  @Test
   def testOverCompactedLogRecoveryMultiRecord(): Unit = {
     // append some messages to create some segments
     val logConfig = LogTest.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
@@ -2649,8 +2724,8 @@ class LogTest {
     for (_ <- 0 until 100)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
 
-    log.leaderEpochCache.assign(0, 40)
-    log.leaderEpochCache.assign(1, 90)
+    log.maybeAssignEpochStartOffset(0, 40)
+    log.maybeAssignEpochStartOffset(1, 90)
 
     // segments are not eligible for deletion if no high watermark has been set
     val numSegments = log.numberOfSegments
@@ -2736,7 +2811,7 @@ class LogTest {
   }
 
   def epochCache(log: Log): LeaderEpochFileCache = {
-    log.leaderEpochCache.asInstanceOf[LeaderEpochFileCache]
+    log.leaderEpochCache.get
   }
 
   @Test
@@ -2866,7 +2941,7 @@ class LogTest {
     //Given this partition is on leader epoch 72
     val epoch = 72
     val log = createLog(logDir, LogConfig())
-    log.leaderEpochCache.assign(epoch, records.size)
+    log.maybeAssignEpochStartOffset(epoch, records.size)
 
     //When appending messages as a leader (i.e. assignOffsets = true)
     for (record <- records)
@@ -2903,7 +2978,7 @@ class LogTest {
     for (i <- records.indices)
       log.appendAsFollower(recordsForEpoch(i))
 
-    assertEquals(42, log.leaderEpochCache.latestEpoch)
+    assertEquals(Some(42), log.latestEpoch)
   }
 
   @Test
@@ -2957,7 +3032,7 @@ class LogTest {
   }
 
   @Test
-  def shouldTruncateLeaderEpochFileWhenTruncatingLog() {
+  def shouldTruncateLeaderEpochCheckpointFileWhenTruncatingLog() {
     def createRecords(startOffset: Long, epoch: Int): MemoryRecords = {
       TestUtils.records(Seq(new SimpleRecord("value".getBytes)),
         baseOffset = startOffset, partitionLeaderEpoch = epoch)
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index e9e4e33..5a12047 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -281,6 +281,72 @@ class AbstractFetcherThreadTest {
   }
 
   @Test
+  def testTruncateToHighWatermarkIfLeaderEpochRequestNotSupported(): Unit = {
+    val highWatermark = 2L
+    val partition = new TopicPartition("topic", 0)
+    val fetcher = new MockFetcherThread {
+      override def truncate(topicPartition: TopicPartition, truncationState: 
OffsetTruncationState): Unit = {
+        assertEquals(highWatermark, truncationState.offset)
+        assertTrue(truncationState.truncationCompleted)
+        super.truncate(topicPartition, truncationState)
+      }
+
+      override def fetchEpochEndOffsets(partitions: Map[TopicPartition, 
EpochData]): Map[TopicPartition, EpochEndOffset] =
+        throw new UnsupportedOperationException
+
+      override protected def isOffsetForLeaderEpochSupported: Boolean = false
+    }
+
+    val replicaLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
+
+    val replicaState = MockFetcherThread.PartitionState(replicaLog, 
leaderEpoch = 5, highWatermark)
+    fetcher.setReplicaState(partition, replicaState)
+    fetcher.addPartitions(Map(partition -> offsetAndEpoch(highWatermark, 
leaderEpoch = 5)))
+
+    fetcher.doWork()
+
+    assertEquals(highWatermark, replicaState.logEndOffset)
+    assertEquals(highWatermark, fetcher.fetchState(partition).get.fetchOffset)
+    assertTrue(fetcher.fetchState(partition).get.isReadyForFetch)
+  }
+
+  @Test
+  def testTruncateToHighWatermarkIfLeaderEpochInfoNotAvailable(): Unit = {
+    val highWatermark = 2L
+    val partition = new TopicPartition("topic", 0)
+    val fetcher = new MockFetcherThread {
+      override def truncate(topicPartition: TopicPartition, truncationState: 
OffsetTruncationState): Unit = {
+        assertEquals(highWatermark, truncationState.offset)
+        assertTrue(truncationState.truncationCompleted)
+        super.truncate(topicPartition, truncationState)
+      }
+
+      override def fetchEpochEndOffsets(partitions: Map[TopicPartition, 
EpochData]): Map[TopicPartition, EpochEndOffset] =
+        throw new UnsupportedOperationException
+
+      override def latestEpoch(topicPartition: TopicPartition): Option[Int] = 
None
+    }
+
+    val replicaLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
+
+    val replicaState = MockFetcherThread.PartitionState(replicaLog, 
leaderEpoch = 5, highWatermark)
+    fetcher.setReplicaState(partition, replicaState)
+    fetcher.addPartitions(Map(partition -> offsetAndEpoch(highWatermark, 
leaderEpoch = 5)))
+
+    fetcher.doWork()
+
+    assertEquals(highWatermark, replicaState.logEndOffset)
+    assertEquals(highWatermark, fetcher.fetchState(partition).get.fetchOffset)
+    assertTrue(fetcher.fetchState(partition).get.isReadyForFetch)
+  }
+
+  @Test
   def testTruncationSkippedIfNoEpochChange(): Unit = {
     val partition = new TopicPartition("topic", 0)
 
@@ -525,8 +591,8 @@ class AbstractFetcherThreadTest {
 
     val fetcher = new MockFetcherThread {
       var fetchEpochsFromLeaderOnce = false
-      override def fetchEpochsFromLeader(partitions: Map[TopicPartition, 
EpochData]): Map[TopicPartition, EpochEndOffset] = {
-        val fetchedEpochs = super.fetchEpochsFromLeader(partitions)
+      override def fetchEpochEndOffsets(partitions: Map[TopicPartition, 
EpochData]): Map[TopicPartition, EpochEndOffset] = {
+        val fetchedEpochs = super.fetchEpochEndOffsets(partitions)
         if (!fetchEpochsFromLeaderOnce) {
           // leader epoch changes while fetching epochs from leader
           removePartitions(Set(partition))
@@ -568,9 +634,9 @@ class AbstractFetcherThreadTest {
   def 
testTruncationThrowsExceptionIfLeaderReturnsPartitionsNotRequestedInFetchEpochs():
 Unit = {
     val partition = new TopicPartition("topic", 0)
     val fetcher = new MockFetcherThread {
-      override def fetchEpochsFromLeader(partitions: Map[TopicPartition, 
EpochData]): Map[TopicPartition, EpochEndOffset] = {
+      override def fetchEpochEndOffsets(partitions: Map[TopicPartition, 
EpochData]): Map[TopicPartition, EpochEndOffset] = {
         val unrequestedTp = new TopicPartition("topic2", 0)
-        super.fetchEpochsFromLeader(partitions) + (unrequestedTp -> new 
EpochEndOffset(0, 0))
+        super.fetchEpochEndOffsets(partitions) + (unrequestedTp -> new 
EpochEndOffset(0, 0))
       }
     }
 
@@ -754,7 +820,7 @@ class AbstractFetcherThreadTest {
       new EpochEndOffset(Errors.NONE, EpochEndOffset.UNDEFINED_EPOCH, 
EpochEndOffset.UNDEFINED_EPOCH_OFFSET)
     }
 
-    override def fetchEpochsFromLeader(partitions: Map[TopicPartition, 
EpochData]): Map[TopicPartition, EpochEndOffset] = {
+    override def fetchEpochEndOffsets(partitions: Map[TopicPartition, 
EpochData]): Map[TopicPartition, EpochEndOffset] = {
       val endOffsets = mutable.Map[TopicPartition, EpochEndOffset]()
       partitions.foreach { case (partition, epochData) =>
         val leaderState = leaderPartitionState(partition)
@@ -764,6 +830,8 @@ class AbstractFetcherThreadTest {
       endOffsets
     }
 
+    override protected def isOffsetForLeaderEpochSupported: Boolean = true
+
     override def fetchFromLeader(fetchRequest: FetchRequest.Builder): 
Seq[(TopicPartition, FetchData)] = {
       fetchRequest.fetchData.asScala.map { case (partition, fetchData) =>
         val leaderState = leaderPartitionState(partition)
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index d5bdf14..006067e 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.cluster.{Partition, Replica}
 import kafka.log.{Log, LogManager}
-import kafka.server.epoch.LeaderEpochFileCache
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
@@ -34,7 +33,6 @@ import org.junit.{After, Before, Test}
 
 import scala.collection.mutable.{HashMap, Map}
 
-
 class IsrExpirationTest {
 
   var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, 
Int), Seq[Int]]()
@@ -253,9 +251,7 @@ class IsrExpirationTest {
 
   private def logMock: Log = {
     val log: Log = EasyMock.createMock(classOf[Log])
-    val cache: LeaderEpochFileCache = 
EasyMock.createNiceMock(classOf[LeaderEpochFileCache])
     EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
-    EasyMock.expect(log.leaderEpochCache).andReturn(cache).anyTimes()
     EasyMock.expect(log.onHighWatermarkIncremented(0L))
     
EasyMock.expect(log.logEndOffsetMetadata).andReturn(LogOffsetMetadata(leaderLogEndOffset)).anyTimes()
     EasyMock.replay(log)
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index a4fbaf2..d21e1e1 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -21,7 +21,6 @@ import java.util.Optional
 import kafka.cluster.{BrokerEndPoint, Partition, Replica}
 import kafka.log.LogManager
 import kafka.server.AbstractFetcherThread.ResultWithPartitions
-import kafka.server.epoch.LeaderEpochFileCache
 import kafka.utils.{DelayedItem, TestUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.KafkaStorageException
@@ -83,7 +82,7 @@ class ReplicaAlterLogDirsThreadTest {
       quota = null,
       brokerTopicStats = null)
 
-    val result = thread.fetchEpochsFromLeader(Map(
+    val result = thread.fetchEpochEndOffsets(Map(
       t1p0 -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 
leaderEpochT1p0),
       t1p1 -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 
leaderEpochT1p1)))
 
@@ -128,7 +127,7 @@ class ReplicaAlterLogDirsThreadTest {
       quota = null,
       brokerTopicStats = null)
 
-    val result = thread.fetchEpochsFromLeader(Map(
+    val result = thread.fetchEpochEndOffsets(Map(
       t1p0 -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 
leaderEpoch),
       t1p1 -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 
leaderEpoch)))
 
@@ -150,8 +149,6 @@ class ReplicaAlterLogDirsThreadTest {
     // Setup all the dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
     val quotaManager: ReplicationQuotaManager = 
createNiceMock(classOf[ReplicationQuotaManager])
-    val futureReplicaLeaderEpochsT1p0: LeaderEpochFileCache = 
createMock(classOf[LeaderEpochFileCache])
-    val futureReplicaLeaderEpochsT1p1: LeaderEpochFileCache = 
createMock(classOf[LeaderEpochFileCache])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaT1p0: Replica = createNiceMock(classOf[Replica])
     val replicaT1p1: Replica = createNiceMock(classOf[Replica])
@@ -178,19 +175,19 @@ class ReplicaAlterLogDirsThreadTest {
     expect(partitionT1p0.truncateTo(capture(truncateCaptureT1p0), 
anyBoolean())).anyTimes()
     expect(partitionT1p1.truncateTo(capture(truncateCaptureT1p1), 
anyBoolean())).anyTimes()
 
-    
expect(futureReplicaT1p0.epochs).andReturn(Some(futureReplicaLeaderEpochsT1p0)).anyTimes()
     expect(futureReplicaT1p0.logEndOffset).andReturn(new 
LogOffsetMetadata(futureReplicaLEO)).anyTimes()
-    
expect(futureReplicaT1p1.epochs).andReturn(Some(futureReplicaLeaderEpochsT1p1)).anyTimes()
     expect(futureReplicaT1p1.logEndOffset).andReturn(new 
LogOffsetMetadata(futureReplicaLEO)).anyTimes()
 
-    
expect(futureReplicaLeaderEpochsT1p0.latestEpoch).andReturn(leaderEpoch).anyTimes()
-    
expect(futureReplicaLeaderEpochsT1p0.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch,
 futureReplicaLEO)).anyTimes()
+    
expect(futureReplicaT1p0.latestEpoch).andReturn(Some(leaderEpoch)).anyTimes()
+    expect(futureReplicaT1p0.endOffsetForEpoch(leaderEpoch)).andReturn(
+      Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch))).anyTimes()
     expect(partitionT1p0.lastOffsetForLeaderEpoch(Optional.of(1), leaderEpoch, 
fetchOnlyFromLeader = false))
       .andReturn(new EpochEndOffset(leaderEpoch, replicaT1p0LEO))
       .anyTimes()
 
-    
expect(futureReplicaLeaderEpochsT1p1.latestEpoch).andReturn(leaderEpoch).anyTimes()
-    
expect(futureReplicaLeaderEpochsT1p1.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch,
 futureReplicaLEO)).anyTimes()
+    
expect(futureReplicaT1p1.latestEpoch).andReturn(Some(leaderEpoch)).anyTimes()
+    expect(futureReplicaT1p1.endOffsetForEpoch(leaderEpoch)).andReturn(
+      Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch))).anyTimes()
     expect(partitionT1p1.lastOffsetForLeaderEpoch(Optional.of(1), leaderEpoch, 
fetchOnlyFromLeader = false))
       .andReturn(new EpochEndOffset(leaderEpoch, replicaT1p1LEO))
       .anyTimes()
@@ -198,8 +195,8 @@ class ReplicaAlterLogDirsThreadTest {
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     stubWithFetchMessages(replicaT1p0, replicaT1p1, futureReplicaT1p0, 
partitionT1p0, replicaManager, responseCallback)
 
-    replay(futureReplicaLeaderEpochsT1p0, futureReplicaLeaderEpochsT1p1, 
replicaManager, logManager, quotaManager,
-      replicaT1p0, replicaT1p1, futureReplicaT1p0, partitionT1p0, 
partitionT1p1)
+    replay(replicaManager, logManager, quotaManager, replicaT1p0, replicaT1p1,
+      futureReplicaT1p0, partitionT1p0, partitionT1p1)
 
     //Create the thread
     val endPoint = new BrokerEndPoint(0, "localhost", 1000)
@@ -229,7 +226,6 @@ class ReplicaAlterLogDirsThreadTest {
     // Setup all the dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
     val quotaManager: ReplicationQuotaManager = 
createNiceMock(classOf[ReplicationQuotaManager])
-    val futureReplicaLeaderEpochs: LeaderEpochFileCache = 
createMock(classOf[LeaderEpochFileCache])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replica: Replica = createNiceMock(classOf[Replica])
     // one future replica mock because our mocking methods return same values 
for both future replicas
@@ -250,27 +246,28 @@ class ReplicaAlterLogDirsThreadTest {
     
expect(replicaManager.futureLocalReplicaOrException(t1p0)).andStubReturn(futureReplica)
 
     expect(partition.truncateTo(capture(truncateToCapture), 
EasyMock.eq(true))).anyTimes()
-    
expect(futureReplica.epochs).andReturn(Some(futureReplicaLeaderEpochs)).anyTimes()
     expect(futureReplica.logEndOffset).andReturn(new 
LogOffsetMetadata(futureReplicaLEO)).anyTimes()
-    expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(leaderEpoch).once()
-    expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(leaderEpoch - 
2).once()
+    expect(futureReplica.latestEpoch).andReturn(Some(leaderEpoch)).once()
+    expect(futureReplica.latestEpoch).andReturn(Some(leaderEpoch - 2)).once()
 
     // leader replica truncated and fetched new offsets with new leader epoch
     expect(partition.lastOffsetForLeaderEpoch(Optional.of(1), leaderEpoch, 
fetchOnlyFromLeader = false))
       .andReturn(new EpochEndOffset(leaderEpoch - 1, replicaLEO))
       .anyTimes()
     // but future replica does not know about this leader epoch, so returns a 
smaller leader epoch
-    expect(futureReplicaLeaderEpochs.endOffsetFor(leaderEpoch - 
1)).andReturn((leaderEpoch - 2, futureReplicaLEO)).anyTimes()
+    expect(futureReplica.endOffsetForEpoch(leaderEpoch - 1)).andReturn(
+      Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch - 2))).anyTimes()
     // finally, the leader replica knows about the leader epoch and returns 
end offset
     expect(partition.lastOffsetForLeaderEpoch(Optional.of(1), leaderEpoch - 2, 
fetchOnlyFromLeader = false))
       .andReturn(new EpochEndOffset(leaderEpoch - 2, replicaEpochEndOffset))
       .anyTimes()
-    expect(futureReplicaLeaderEpochs.endOffsetFor(leaderEpoch - 
2)).andReturn((leaderEpoch - 2, futureReplicaEpochEndOffset)).anyTimes()
+    expect(futureReplica.endOffsetForEpoch(leaderEpoch - 2)).andReturn(
+      Some(OffsetAndEpoch(futureReplicaEpochEndOffset, leaderEpoch - 
2))).anyTimes()
 
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     stubWithFetchMessages(replica, replica, futureReplica, partition, 
replicaManager, responseCallback)
 
-    replay(futureReplicaLeaderEpochs, replicaManager, logManager, 
quotaManager, replica, futureReplica, partition)
+    replay(replicaManager, logManager, quotaManager, replica, futureReplica, 
partition)
 
     //Create the thread
     val endPoint = new BrokerEndPoint(0, "localhost", 1000)
@@ -305,7 +302,6 @@ class ReplicaAlterLogDirsThreadTest {
     val logManager: LogManager = createMock(classOf[LogManager])
     val replica: Replica = createNiceMock(classOf[Replica])
     val futureReplica: Replica = createNiceMock(classOf[Replica])
-    val futureReplicaLeaderEpochs: LeaderEpochFileCache = 
createMock(classOf[LeaderEpochFileCache])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
     val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => 
Unit]  = EasyMock.newCapture()
@@ -321,13 +317,12 @@ class ReplicaAlterLogDirsThreadTest {
 
     expect(futureReplica.logEndOffset).andReturn(new 
LogOffsetMetadata(futureReplicaLEO)).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
-    
expect(futureReplica.epochs).andReturn(Some(futureReplicaLeaderEpochs)).anyTimes()
 
     // pretend this is a completely new future replica, with no leader epochs 
recorded
-    
expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(UNDEFINED_EPOCH).anyTimes()
+    expect(futureReplica.latestEpoch).andReturn(None).anyTimes()
 
     stubWithFetchMessages(replica, replica, futureReplica, partition, 
replicaManager, responseCallback)
-    replay(replicaManager, logManager, quotaManager, 
futureReplicaLeaderEpochs, replica, futureReplica, partition)
+    replay(replicaManager, logManager, quotaManager, replica, futureReplica, 
partition)
 
     //Create the thread
     val endPoint = new BrokerEndPoint(0, "localhost", 1000)
@@ -357,7 +352,6 @@ class ReplicaAlterLogDirsThreadTest {
     // Setup all the dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
     val quotaManager: ReplicationQuotaManager = 
createNiceMock(classOf[ReplicationQuotaManager])
-    val futureReplicaLeaderEpochs: LeaderEpochFileCache = 
createMock(classOf[LeaderEpochFileCache])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replica: Replica = createNiceMock(classOf[Replica])
     val futureReplica: Replica = createNiceMock(classOf[Replica])
@@ -375,9 +369,9 @@ class ReplicaAlterLogDirsThreadTest {
     expect(partition.truncateTo(capture(truncated), isFuture = 
EasyMock.eq(true))).once()
 
     
expect(replicaManager.futureLocalReplicaOrException(t1p0)).andStubReturn(futureReplica)
-    
expect(futureReplica.epochs).andReturn(Some(futureReplicaLeaderEpochs)).anyTimes()
-    
expect(futureReplicaLeaderEpochs.latestEpoch).andStubReturn(futureReplicaLeaderEpoch)
-    
expect(futureReplicaLeaderEpochs.endOffsetFor(futureReplicaLeaderEpoch)).andReturn((futureReplicaLeaderEpoch,
 futureReplicaLEO))
+    
expect(futureReplica.latestEpoch).andStubReturn(Some(futureReplicaLeaderEpoch))
+    
expect(futureReplica.endOffsetForEpoch(futureReplicaLeaderEpoch)).andReturn(
+      Some(OffsetAndEpoch(futureReplicaLEO, futureReplicaLeaderEpoch)))
     expect(futureReplica.logEndOffset).andReturn(new 
LogOffsetMetadata(futureReplicaLEO)).anyTimes()
     
expect(replicaManager.localReplica(t1p0)).andReturn(Some(replica)).anyTimes()
     
expect(replicaManager.futureLocalReplica(t1p0)).andReturn(Some(futureReplica)).anyTimes()
@@ -406,7 +400,7 @@ class ReplicaAlterLogDirsThreadTest {
         }
       }).anyTimes()
 
-    replay(futureReplicaLeaderEpochs, replicaManager, logManager, 
quotaManager, replica, futureReplica, partition)
+    replay(replicaManager, logManager, quotaManager, replica, futureReplica, 
partition)
 
     //Create the thread
     val endPoint = new BrokerEndPoint(0, "localhost", 1000)
@@ -440,7 +434,6 @@ class ReplicaAlterLogDirsThreadTest {
     //Setup all dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
     val quotaManager: ReplicationQuotaManager = 
createNiceMock(classOf[ReplicationQuotaManager])
-    val futureReplicaLeaderEpochs: LeaderEpochFileCache = 
createMock(classOf[LeaderEpochFileCache])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replica: Replica = createNiceMock(classOf[Replica])
     val futureReplica: Replica = createNiceMock(classOf[Replica])
@@ -459,15 +452,14 @@ class ReplicaAlterLogDirsThreadTest {
     expect(partition.truncateTo(futureReplicaLEO, isFuture = true)).once()
 
     
expect(replicaManager.futureLocalReplicaOrException(t1p0)).andStubReturn(futureReplica)
-    
expect(futureReplica.epochs).andReturn(Some(futureReplicaLeaderEpochs)).anyTimes()
-    expect(futureReplicaLeaderEpochs.latestEpoch).andStubReturn(leaderEpoch)
+    expect(futureReplica.latestEpoch).andStubReturn(Some(leaderEpoch))
     expect(futureReplica.logEndOffset).andReturn(new 
LogOffsetMetadata(futureReplicaLEO)).anyTimes()
-    
expect(futureReplicaLeaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch,
 futureReplicaLEO))
+    expect(futureReplica.endOffsetForEpoch(leaderEpoch)).andReturn(
+      Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch)))
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     stubWithFetchMessages(replica, replica, futureReplica, partition, 
replicaManager, responseCallback)
 
-    replay(futureReplicaLeaderEpochs, replicaManager, logManager, quotaManager,
-           replica, futureReplica, partition)
+    replay(replicaManager, logManager, quotaManager, replica, futureReplica, 
partition)
 
     //Create the fetcher thread
     val endPoint = new BrokerEndPoint(0, "localhost", 1000)
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index ef39aa5..04e0218 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -22,7 +22,6 @@ import kafka.cluster.{BrokerEndPoint, Replica}
 import kafka.log.LogManager
 import kafka.cluster.Partition
 import kafka.server.QuotaFactory.UnboundedQuota
-import kafka.server.epoch.LeaderEpochFileCache
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.ClientResponse
@@ -74,96 +73,11 @@ class ReplicaFetcherThreadTest {
   }
 
   @Test
-  def shouldNotIssueLeaderEpochRequestIfInterbrokerVersionBelow11(): Unit = {
-    val props = TestUtils.createBrokerConfig(1, "localhost:1234")
-    props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.10.2")
-    props.put(KafkaConfig.LogMessageFormatVersionProp, "0.10.2")
-    val config = KafkaConfig.fromProps(props)
-    val leaderEndpoint: BlockingSend = createMock(classOf[BlockingSend])
-    expect(leaderEndpoint.sendRequest(anyObject())).andAnswer(new 
IAnswer[ClientResponse] {
-      override def answer(): ClientResponse = {
-        toFail = true // assert no leader request is sent
-        createMock(classOf[ClientResponse])
-      }
-    })
-    replay(leaderEndpoint)
-    val thread = new ReplicaFetcherThread(
-      name = "bob",
-      fetcherId = 0,
-      sourceBroker = brokerEndPoint,
-      brokerConfig = config,
-      replicaMgr = null,
-      metrics =  new Metrics(),
-      time = new SystemTime(),
-      quota = null,
-      leaderEndpointBlockingSend = None)
-
-    val result = thread.fetchEpochsFromLeader(Map(
-      t1p0 -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 
0),
-      t1p1 -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 
0)))
-
-    val expected = Map(
-      t1p0 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET),
-      t1p1 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
-    )
-
-    assertFalse("Leader request should not have been sent", toFail)
-    assertEquals("results from leader epoch request should have undefined 
offset", expected, result)
-  }
-
-  /**
-    * If a partition doesn't have an epoch in the cache, this means it either
-    *   does not support epochs (log message format below 0.11) or it is a 
bootstrapping follower.
-    *
-    * In both cases, the partition has an undefined epoch
-    *   and there is no use to send the request, as we know the broker will 
answer with that epoch
-    */
-  @Test
-  def shouldNotSendEpochRequestIfLastEpochUndefinedForAllPartitions(): Unit = {
-    val props = TestUtils.createBrokerConfig(1, "localhost:1234")
-    props.put(KafkaConfig.InterBrokerProtocolVersionProp, "1.0.0")
-    val config = KafkaConfig.fromProps(props)
-    val leaderEndpoint: BlockingSend = createMock(classOf[BlockingSend])
-
-    expect(leaderEndpoint.sendRequest(anyObject())).andAnswer(new 
IAnswer[ClientResponse] {
-      override def answer(): ClientResponse = {
-        toFail = true // assert no leader request is sent
-        createMock(classOf[ClientResponse])
-      }
-    })
-    replay(leaderEndpoint)
-    val thread = new ReplicaFetcherThread(
-      name = "bob",
-      fetcherId = 0,
-      sourceBroker = brokerEndPoint,
-      brokerConfig = config,
-      replicaMgr = null,
-      metrics =  new Metrics(),
-      time = new SystemTime(),
-      quota = null,
-      leaderEndpointBlockingSend = Some(leaderEndpoint))
-
-
-    val result = thread.fetchEpochsFromLeader(Map(
-      t1p0 -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 
UNDEFINED_EPOCH),
-      t1p1 -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 
UNDEFINED_EPOCH)))
-
-    val expected = Map(
-      t1p0 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET),
-      t1p1 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
-    )
-
-    assertFalse("Leader request should not have been sent", toFail)
-    assertEquals("results from leader epoch request should have undefined 
offset", expected, result)
-  }
-
-  @Test
   def shouldFetchLeaderEpochRequestIfLastEpochDefinedForSomePartitions(): Unit 
= {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
 
     //Setup all dependencies
     val quota: ReplicationQuotaManager = 
createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs: LeaderEpochFileCache = 
createNiceMock(classOf[LeaderEpochFileCache])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = 
createMock(classOf[ReplicaAlterLogDirsManager])
     val replica: Replica = createNiceMock(classOf[Replica])
@@ -173,13 +87,13 @@ class ReplicaFetcherThreadTest {
     val leaderEpoch = 5
 
     //Stubs
-    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
     expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes()
     expect(replica.highWatermark).andReturn(new 
LogOffsetMetadata(0)).anyTimes()
-    expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch).once()
-    expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch).once()
-    expect(leaderEpochs.latestEpoch).andReturn(UNDEFINED_EPOCH).once()  // 
t2p1 doesnt support epochs
-    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, 
0)).anyTimes()
+    expect(replica.latestEpoch).andReturn(Some(leaderEpoch)).once()
+    expect(replica.latestEpoch).andReturn(Some(leaderEpoch)).once()
+    expect(replica.latestEpoch).andReturn(None).once()  // t2p1 doesnt support 
epochs
+    expect(replica.endOffsetForEpoch(leaderEpoch)).andReturn(
+      Some(OffsetAndEpoch(0, leaderEpoch))).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
     stub(replica, partition, replicaManager)
@@ -187,12 +101,11 @@ class ReplicaFetcherThreadTest {
     //Expectations
     expect(partition.truncateTo(anyLong(), anyBoolean())).once
 
-    replay(leaderEpochs, replicaManager, logManager, quota, replica)
+    replay(replicaManager, logManager, quota, replica)
 
     //Define the offsets for the OffsetsForLeaderEpochResponse
     val offsets = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 1),
-      t1p1 -> new EpochEndOffset(leaderEpoch, 1),
-      t2p1 -> new EpochEndOffset(-1, 1)).asJava
+      t1p1 -> new EpochEndOffset(leaderEpoch, 1)).asJava
 
     //Create the fetcher thread
     val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, 
brokerEndPoint, new SystemTime())
@@ -277,7 +190,7 @@ class ReplicaFetcherThreadTest {
       quota = null,
       leaderEndpointBlockingSend = Some(mockBlockingSend))
 
-    val result = thread.fetchEpochsFromLeader(Map(
+    val result = thread.fetchEpochEndOffsets(Map(
       t1p0 -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 
0),
       t1p1 -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 
0)))
 
@@ -295,7 +208,6 @@ class ReplicaFetcherThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
 
     //Setup all dependencies
-    val leaderEpochs: LeaderEpochFileCache = 
createNiceMock(classOf[LeaderEpochFileCache])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = 
createMock(classOf[ReplicaAlterLogDirsManager])
     val replica: Replica = createNiceMock(classOf[Replica])
@@ -305,11 +217,11 @@ class ReplicaFetcherThreadTest {
     val leaderEpoch = 5
 
     //Stubs
-    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
     expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes()
     expect(replica.highWatermark).andReturn(new 
LogOffsetMetadata(0)).anyTimes()
-    expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch)
-    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, 
0)).anyTimes()
+    expect(replica.latestEpoch).andReturn(Some(leaderEpoch)).anyTimes()
+    expect(replica.endOffsetForEpoch(leaderEpoch)).andReturn(
+      Some(OffsetAndEpoch(0, leaderEpoch))).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
     stub(replica, partition, replicaManager)
@@ -317,7 +229,7 @@ class ReplicaFetcherThreadTest {
     //Expectations
     expect(partition.truncateTo(anyLong(), anyBoolean())).once
 
-    replay(leaderEpochs, replicaManager, logManager, replica)
+    replay(replicaManager, logManager, replica)
 
     //Define the offsets for the OffsetsForLeaderEpochResponse
     val offsets = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 1), t1p1 -> new 
EpochEndOffset(leaderEpoch, 1)).asJava
@@ -356,7 +268,6 @@ class ReplicaFetcherThreadTest {
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, 
"localhost:1234").map(KafkaConfig.fromProps)
     val quota: ReplicationQuotaManager = 
createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs: LeaderEpochFileCache = 
createMock(classOf[LeaderEpochFileCache])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = 
createMock(classOf[ReplicaAlterLogDirsManager])
     val replica: Replica = createNiceMock(classOf[Replica])
@@ -368,16 +279,16 @@ class ReplicaFetcherThreadTest {
 
     //Stubs
     expect(partition.truncateTo(capture(truncateToCapture), 
anyBoolean())).anyTimes()
-    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
     expect(replica.logEndOffset).andReturn(new 
LogOffsetMetadata(initialLEO)).anyTimes()
     expect(replica.highWatermark).andReturn(new LogOffsetMetadata(initialLEO - 
1)).anyTimes()
-    expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch).anyTimes()
-    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, 
initialLEO)).anyTimes()
+    expect(replica.latestEpoch).andReturn(Some(leaderEpoch)).anyTimes()
+    expect(replica.endOffsetForEpoch(leaderEpoch)).andReturn(
+      Some(OffsetAndEpoch(initialLEO, leaderEpoch))).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
     stub(replica, partition, replicaManager)
 
-    replay(leaderEpochs, replicaManager, logManager, quota, replica, partition)
+    replay(replicaManager, logManager, quota, replica, partition)
 
     //Define the offsets for the OffsetsForLeaderEpochResponse, these are used 
for truncation
     val offsetsReply = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 156), t2p1 
-> new EpochEndOffset(leaderEpoch, 172)).asJava
@@ -405,7 +316,6 @@ class ReplicaFetcherThreadTest {
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, 
"localhost:1234").map(KafkaConfig.fromProps)
     val quota: ReplicationQuotaManager = 
createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs: LeaderEpochFileCache = 
createMock(classOf[LeaderEpochFileCache])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = 
createMock(classOf[ReplicaAlterLogDirsManager])
     val replica: Replica = createNiceMock(classOf[Replica])
@@ -418,16 +328,15 @@ class ReplicaFetcherThreadTest {
 
     //Stubs
     expect(partition.truncateTo(capture(truncateToCapture), 
anyBoolean())).anyTimes()
-    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
     expect(replica.logEndOffset).andReturn(new 
LogOffsetMetadata(initialLEO)).anyTimes()
     expect(replica.highWatermark).andReturn(new LogOffsetMetadata(initialLEO - 
3)).anyTimes()
-    
expect(leaderEpochs.latestEpoch).andReturn(leaderEpochAtFollower).anyTimes()
-    
expect(leaderEpochs.endOffsetFor(leaderEpochAtLeader)).andReturn((UNDEFINED_EPOCH,
 UNDEFINED_EPOCH_OFFSET)).anyTimes()
+    
expect(replica.latestEpoch).andReturn(Some(leaderEpochAtFollower)).anyTimes()
+    
expect(replica.endOffsetForEpoch(leaderEpochAtLeader)).andReturn(None).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
     stub(replica, partition, replicaManager)
 
-    replay(leaderEpochs, replicaManager, logManager, quota, replica, partition)
+    replay(replicaManager, logManager, quota, replica, partition)
 
     //Define the offsets for the OffsetsForLeaderEpochResponse, these are used 
for truncation
     val offsetsReply = Map(t1p0 -> new EpochEndOffset(leaderEpochAtLeader, 
156),
@@ -459,7 +368,6 @@ class ReplicaFetcherThreadTest {
 
     // Setup all dependencies
     val quota: ReplicationQuotaManager = 
createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs: LeaderEpochFileCache = 
createNiceMock(classOf[LeaderEpochFileCache])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = 
createMock(classOf[ReplicaAlterLogDirsManager])
     val replica: Replica = createNiceMock(classOf[Replica])
@@ -470,17 +378,18 @@ class ReplicaFetcherThreadTest {
 
     // Stubs
     expect(partition.truncateTo(capture(truncateToCapture), 
anyBoolean())).anyTimes()
-    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
     expect(replica.logEndOffset).andReturn(new 
LogOffsetMetadata(initialLEO)).anyTimes()
     expect(replica.highWatermark).andReturn(new LogOffsetMetadata(initialLEO - 
2)).anyTimes()
-    expect(leaderEpochs.latestEpoch).andReturn(5)
-    expect(leaderEpochs.endOffsetFor(4)).andReturn((3, 120)).anyTimes()
-    expect(leaderEpochs.endOffsetFor(3)).andReturn((3, 120)).anyTimes()
+    expect(replica.latestEpoch).andReturn(Some(5)).anyTimes()
+    expect(replica.endOffsetForEpoch(4)).andReturn(
+      Some(OffsetAndEpoch(120, 3))).anyTimes()
+    expect(replica.endOffsetForEpoch(3)).andReturn(
+      Some(OffsetAndEpoch(120, 3))).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
     stub(replica, partition, replicaManager)
 
-    replay(leaderEpochs, replicaManager, logManager, quota, replica, partition)
+    replay(replicaManager, logManager, quota, replica, partition)
 
     // Define the offsets for the OffsetsForLeaderEpochResponse
     val offsets = Map(t1p0 -> new EpochEndOffset(4, 155), t1p1 -> new 
EpochEndOffset(4, 143)).asJava
@@ -530,7 +439,6 @@ class ReplicaFetcherThreadTest {
 
     // Setup all dependencies
     val quota: ReplicationQuotaManager = 
createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs: LeaderEpochFileCache = 
createNiceMock(classOf[LeaderEpochFileCache])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = 
createMock(classOf[ReplicaAlterLogDirsManager])
     val replica: Replica = createNiceMock(classOf[Replica])
@@ -541,17 +449,18 @@ class ReplicaFetcherThreadTest {
 
     // Stubs
     expect(partition.truncateTo(capture(truncateToCapture), 
anyBoolean())).anyTimes()
-    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
     expect(replica.logEndOffset).andReturn(new 
LogOffsetMetadata(initialLEO)).anyTimes()
     expect(replica.highWatermark).andReturn(new LogOffsetMetadata(initialLEO - 
2)).anyTimes()
-    expect(leaderEpochs.latestEpoch).andReturn(5)
-    expect(leaderEpochs.endOffsetFor(4)).andReturn((3, 120)).anyTimes()
-    expect(leaderEpochs.endOffsetFor(3)).andReturn((3, 120)).anyTimes()
+    expect(replica.latestEpoch).andReturn(Some(5)).anyTimes()
+    expect(replica.endOffsetForEpoch(4)).andReturn(
+      Some(OffsetAndEpoch(120, 3))).anyTimes()
+    expect(replica.endOffsetForEpoch(3)).andReturn(
+      Some(OffsetAndEpoch(120, 3))).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
     stub(replica, partition, replicaManager)
 
-    replay(leaderEpochs, replicaManager, logManager, quota, replica, partition)
+    replay(replicaManager, logManager, quota, replica, partition)
 
     // Define the offsets for the OffsetsForLeaderEpochResponse with undefined 
epoch to simulate
     // older protocol version
@@ -591,7 +500,6 @@ class ReplicaFetcherThreadTest {
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, 
"localhost:1234").map(KafkaConfig.fromProps)
     val quota: ReplicationQuotaManager = 
createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs: LeaderEpochFileCache = 
createNiceMock(classOf[LeaderEpochFileCache])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = 
createMock(classOf[ReplicaAlterLogDirsManager])
     val replica: Replica = createNiceMock(classOf[Replica])
@@ -603,14 +511,13 @@ class ReplicaFetcherThreadTest {
 
     //Stubs
     expect(partition.truncateTo(capture(truncated), anyBoolean())).anyTimes()
-    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
     expect(replica.logEndOffset).andReturn(new 
LogOffsetMetadata(initialLeo)).anyTimes()
     expect(replica.highWatermark).andReturn(new 
LogOffsetMetadata(initialFetchOffset)).anyTimes()
-    expect(leaderEpochs.latestEpoch).andReturn(5)
+    expect(replica.latestEpoch).andReturn(Some(5))
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
     stub(replica, partition, replicaManager)
-    replay(leaderEpochs, replicaManager, logManager, quota, replica, partition)
+    replay(replicaManager, logManager, quota, replica, partition)
 
     //Define the offsets for the OffsetsForLeaderEpochResponse, these are used 
for truncation
     val offsetsReply = Map(t1p0 -> new 
EpochEndOffset(EpochEndOffset.UNDEFINED_EPOCH, 
EpochEndOffset.UNDEFINED_EPOCH_OFFSET)).asJava
@@ -636,7 +543,6 @@ class ReplicaFetcherThreadTest {
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, 
"localhost:1234").map(KafkaConfig.fromProps)
     val quota: ReplicationQuotaManager = 
createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs: LeaderEpochFileCache = 
createNiceMock(classOf[LeaderEpochFileCache])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = 
createMock(classOf[ReplicaAlterLogDirsManager])
     val replica: Replica = createNiceMock(classOf[Replica])
@@ -650,15 +556,15 @@ class ReplicaFetcherThreadTest {
     //Stubs
     expect(replica.highWatermark).andReturn(new 
LogOffsetMetadata(highWaterMark)).anyTimes()
     expect(partition.truncateTo(capture(truncated), anyBoolean())).anyTimes()
-    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
     expect(replica.logEndOffset).andReturn(new 
LogOffsetMetadata(initialLeo)).anyTimes()
-    expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch)
+    expect(replica.latestEpoch).andReturn(Some(leaderEpoch)).anyTimes()
     // this is for the last reply with EpochEndOffset(5, 156)
-    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, 
initialLeo)).anyTimes()
+    expect(replica.endOffsetForEpoch(leaderEpoch)).andReturn(
+      Some(OffsetAndEpoch(initialLeo, leaderEpoch))).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
     stub(replica, partition, replicaManager)
-    replay(leaderEpochs, replicaManager, logManager, quota, replica, partition)
+    replay(replicaManager, logManager, quota, replica, partition)
 
     //Define the offsets for the OffsetsForLeaderEpochResponse, these are used 
for truncation
     val offsetsReply = mutable.Map(
@@ -694,7 +600,6 @@ class ReplicaFetcherThreadTest {
 
     //Setup all stubs
     val quota: ReplicationQuotaManager = 
createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs: LeaderEpochFileCache = 
createNiceMock(classOf[LeaderEpochFileCache])
     val logManager: LogManager = createNiceMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = 
createMock(classOf[ReplicaAlterLogDirsManager])
     val replica: Replica = createNiceMock(classOf[Replica])
@@ -704,16 +609,16 @@ class ReplicaFetcherThreadTest {
     val leaderEpoch = 4
 
     //Stub return values
-    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
     expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes()
     expect(replica.highWatermark).andReturn(new 
LogOffsetMetadata(0)).anyTimes()
-    expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch)
-    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, 
0)).anyTimes()
+    expect(replica.latestEpoch).andReturn(Some(leaderEpoch)).anyTimes()
+    expect(replica.endOffsetForEpoch(leaderEpoch)).andReturn(
+      Some(OffsetAndEpoch(0, leaderEpoch))).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
     stub(replica, partition, replicaManager)
 
-    replay(leaderEpochs, replicaManager, logManager, quota, replica)
+    replay(replicaManager, logManager, quota, replica)
 
     //Define the offsets for the OffsetsForLeaderEpochResponse
     val offsetsReply = Map(
@@ -747,7 +652,6 @@ class ReplicaFetcherThreadTest {
 
     //Setup all stubs
     val quota: ReplicationQuotaManager = 
createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs: LeaderEpochFileCache = 
createNiceMock(classOf[LeaderEpochFileCache])
     val logManager: LogManager = createNiceMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = 
createMock(classOf[ReplicaAlterLogDirsManager])
     val replica: Replica = createNiceMock(classOf[Replica])
@@ -756,16 +660,15 @@ class ReplicaFetcherThreadTest {
 
     //Stub return values
     expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).once
-    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
     expect(replica.logEndOffset).andReturn(new 
LogOffsetMetadata(initialLEO)).anyTimes()
     expect(replica.highWatermark).andReturn(new LogOffsetMetadata(initialLEO - 
2)).anyTimes()
-    expect(leaderEpochs.latestEpoch).andReturn(5)
-    expect(leaderEpochs.endOffsetFor(5)).andReturn((5, initialLEO)).anyTimes()
+    expect(replica.latestEpoch).andReturn(Some(5)).anyTimes()
+    
expect(replica.endOffsetForEpoch(5)).andReturn(Some(OffsetAndEpoch(initialLEO, 
5))).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
     stub(replica, partition, replicaManager)
 
-    replay(leaderEpochs, replicaManager, logManager, quota, replica, partition)
+    replay(replicaManager, logManager, quota, replica, partition)
 
     //Define the offsets for the OffsetsForLeaderEpochResponse
     val offsetsReply = Map(
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 08aa624..65b62d0 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -26,7 +26,6 @@ import kafka.log.{Log, LogConfig, LogManager, 
ProducerStateManager}
 import kafka.utils.{MockScheduler, MockTime, TestUtils}
 import TestUtils.createBroker
 import kafka.cluster.BrokerEndPoint
-import kafka.server.epoch.LeaderEpochFileCache
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.timer.MockTimer
 import kafka.zk.KafkaZkClient
@@ -602,11 +601,6 @@ class ReplicaManagerTest {
     val mockScheduler = new MockScheduler(time)
     val mockBrokerTopicStats = new BrokerTopicStats
     val mockLogDirFailureChannel = new 
LogDirFailureChannel(config.logDirs.size)
-    val mockLeaderEpochCache: LeaderEpochFileCache = 
EasyMock.createMock(classOf[LeaderEpochFileCache])
-    
EasyMock.expect(mockLeaderEpochCache.latestEpoch).andReturn(leaderEpochFromLeader)
-    EasyMock.expect(mockLeaderEpochCache.endOffsetFor(leaderEpochFromLeader))
-      .andReturn((leaderEpochFromLeader, localLogOffset))
-    EasyMock.replay(mockLeaderEpochCache)
     val mockLog = new Log(
       dir = new File(new File(config.logDirs.head), s"$topic-0"),
       config = LogConfig(),
@@ -622,7 +616,12 @@ class ReplicaManagerTest {
         new File(new File(config.logDirs.head), s"$topic-$topicPartition"), 
30000),
       logDirFailureChannel = mockLogDirFailureChannel) {
 
-      override def leaderEpochCache: LeaderEpochFileCache = 
mockLeaderEpochCache
+      override def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] 
= {
+        assertEquals(leaderEpoch, leaderEpochFromLeader)
+        Some(OffsetAndEpoch(localLogOffset, leaderEpochFromLeader))
+      }
+
+      override def latestEpoch: Option[Int] = Some(leaderEpochFromLeader)
 
       override def logEndOffsetMetadata = LogOffsetMetadata(localLogOffset)
     }
diff --git 
a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
 
b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 1d70539..ac6dedc 100644
--- 
a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -435,7 +435,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends 
ZooKeeperTestHarness
     producer = createProducer //TODO not sure why we need to recreate the 
producer, but it doesn't reconnect if we don't
   }
 
-  private def epochCache(broker: KafkaServer): LeaderEpochFileCache = 
getLog(broker, 0).leaderEpochCache
+  private def epochCache(broker: KafkaServer): LeaderEpochFileCache = 
getLog(broker, 0).leaderEpochCache.get
 
   private def latestRecord(leader: KafkaServer, offset: Int = -1, partition: 
Int = 0): RecordBatch = {
     getLog(leader, partition).activeSegment.read(0, None, Integer.MAX_VALUE)
diff --git 
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
index fc4dcfd..e3b96db 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
@@ -48,7 +48,7 @@ class LeaderEpochFileCacheTest {
     logEndOffset = 11
 
     //Then
-    assertEquals(2, cache.latestEpoch)
+    assertEquals(Some(2), cache.latestEpoch)
     assertEquals(EpochEntry(2, 10), cache.epochEntries(0))
     assertEquals((2, logEndOffset), cache.endOffsetFor(2)) //should match 
logEndOffset
   }
@@ -256,7 +256,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 1, startOffset = 7); logEndOffset = 8
 
     //Then later epochs will be removed
-    assertEquals(1, cache.latestEpoch)
+    assertEquals(Some(1), cache.latestEpoch)
 
     //Then end offset for epoch 1 will have changed
     assertEquals((1, 8), cache.endOffsetFor(1))
@@ -285,7 +285,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 1, startOffset = 0) //logEndOffset=0
 
     //Then epoch should go up
-    assertEquals(1, cache.latestEpoch)
+    assertEquals(Some(1), cache.latestEpoch)
     //offset for 1 should still be 0
     assertEquals((1, 0), cache.endOffsetFor(1))
     //offset for epoch 0 should still be 0
@@ -322,7 +322,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 0, startOffset = 2); logEndOffset = 3
 
     //Then epoch should stay, offsets should grow
-    assertEquals(0, cache.latestEpoch)
+    assertEquals(Some(0), cache.latestEpoch)
     assertEquals((0, logEndOffset), cache.endOffsetFor(0))
 
     //When messages arrive with greater epoch
@@ -330,7 +330,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 1, startOffset = 4); logEndOffset = 5
     cache.assign(epoch = 1, startOffset = 5); logEndOffset = 6
 
-    assertEquals(1, cache.latestEpoch)
+    assertEquals(Some(1), cache.latestEpoch)
     assertEquals((1, logEndOffset), cache.endOffsetFor(1))
 
     //When
@@ -338,7 +338,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 2, startOffset = 7); logEndOffset = 8
     cache.assign(epoch = 2, startOffset = 8); logEndOffset = 9
 
-    assertEquals(2, cache.latestEpoch)
+    assertEquals(Some(2), cache.latestEpoch)
     assertEquals((2, logEndOffset), cache.endOffsetFor(2))
 
     //Older epochs should return the start offset of the first message in the 
subsequent epoch.
@@ -483,7 +483,7 @@ class LeaderEpochFileCacheTest {
     cache.truncateFromEnd(endOffset = 9)
 
     //Then should keep the preceding epochs
-    assertEquals(3, cache.latestEpoch)
+    assertEquals(Some(3), cache.latestEpoch)
     assertEquals(ListBuffer(EpochEntry(2, 6), EpochEntry(3, 8)), 
cache.epochEntries)
   }
 
@@ -532,7 +532,7 @@ class LeaderEpochFileCacheTest {
   @Test
   def shouldFetchLatestEpochOfEmptyCache(): Unit = {
     //Then
-    assertEquals(-1, cache.latestEpoch)
+    assertEquals(None, cache.latestEpoch)
   }
 
   @Test
diff --git 
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala 
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 7adc204..3d3b342 100644
--- 
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -42,18 +42,16 @@ class OffsetsForLeaderEpochTest {
   @Test
   def shouldGetEpochsFromReplica(): Unit = {
     //Given
-    val epochAndOffset = (5, 42L)
+    val offsetAndEpoch = OffsetAndEpoch(42L, 5)
     val epochRequested: Integer = 5
     val request = Map(tp -> new 
OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), epochRequested))
 
     //Stubs
     val mockLog: Log = createNiceMock(classOf[Log])
-    val mockCache: LeaderEpochFileCache = 
createNiceMock(classOf[LeaderEpochFileCache])
     val logManager: LogManager = createNiceMock(classOf[LogManager])
-    expect(mockCache.endOffsetFor(epochRequested)).andReturn(epochAndOffset)
-    expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes()
+    
expect(mockLog.endOffsetForEpoch(epochRequested)).andReturn(Some(offsetAndEpoch))
     expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
-    replay(mockCache, mockLog, logManager)
+    replay(mockLog, logManager)
 
     // create a replica manager with 1 partition that has 1 replica
     val replicaManager = new ReplicaManager(config, metrics, time, null, null, 
logManager, new AtomicBoolean(false),
@@ -68,7 +66,7 @@ class OffsetsForLeaderEpochTest {
     val response = replicaManager.lastOffsetForLeaderEpoch(request)
 
     //Then
-    assertEquals(new EpochEndOffset(Errors.NONE, epochAndOffset._1, 
epochAndOffset._2), response(tp))
+    assertEquals(new EpochEndOffset(Errors.NONE, offsetAndEpoch.leaderEpoch, 
offsetAndEpoch.offset), response(tp))
   }
 
   @Test

Reply via email to