This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9679c44   KAFKA-6361: Fix log divergence between leader and follower 
after fast leader fail over (#4882)
9679c44 is described below

commit 9679c44d2b521b5c627e7bde375c0883f5857e0c
Author: Anna Povzner <[email protected]>
AuthorDate: Wed May 9 18:49:51 2018 -0700

     KAFKA-6361: Fix log divergence between leader and follower after fast 
leader fail over (#4882)
    
    Implementation of KIP-279 as described here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-279%3A+Fix+log+divergence+between+leader+and+follower+after+fast+leader+fail+over
    
    In summary:
    - Added leader_epoch to OFFSET_FOR_LEADER_EPOCH_RESPONSE
    - Leader replies with the pair( largest epoch less than or equal to the 
requested epoch, the end offset of this epoch)
    - If Follower does not know about the leader epoch that leader replies 
with, it truncates to the end offset of largest leader epoch less than leader 
epoch that leader replied with, and sends another OffsetForLeaderEpoch request. 
That request contains the largest leader epoch less than leader epoch that 
leader replied with.
    
    Reviewers: Dong Lin <[email protected]>, Jun Rao <[email protected]>
---
 .../apache/kafka/common/protocol/CommonFields.java |   1 +
 .../kafka/common/requests/EpochEndOffset.java      |  25 +-
 .../requests/OffsetsForLeaderEpochRequest.java     |  16 +-
 .../requests/OffsetsForLeaderEpochResponse.java    |  23 +-
 .../kafka/common/requests/RequestResponseTest.java |   8 +-
 core/src/main/scala/kafka/api/ApiVersion.scala     |   1 +
 core/src/main/scala/kafka/cluster/Partition.scala  |  11 +-
 .../kafka/consumer/ConsumerFetcherThread.scala     |   4 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |  98 ++++++-
 .../kafka/server/ReplicaAlterLogDirsThread.scala   |  41 +--
 .../scala/kafka/server/ReplicaFetcherThread.scala  |  58 ++---
 .../main/scala/kafka/server/ReplicaManager.scala   |   4 +-
 .../kafka/server/epoch/LeaderEpochFileCache.scala  |  40 +--
 .../kafka/api/AuthorizerIntegrationTest.scala      |   2 +-
 .../kafka/server/AbstractFetcherThreadTest.scala   |   3 +-
 .../server/ReplicaAlterLogDirsThreadTest.scala     |  95 ++++++-
 .../kafka/server/ReplicaFetcherThreadTest.scala    | 289 +++++++++++++++++----
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   2 +-
 ...chDrivenReplicationProtocolAcceptanceTest.scala |  84 +++++-
 .../server/epoch/LeaderEpochFileCacheTest.scala    |  64 ++---
 .../server/epoch/LeaderEpochIntegrationTest.scala  |   3 +-
 .../server/epoch/OffsetsForLeaderEpochTest.scala   |  12 +-
 .../util/ReplicaFetcherMockBlockingSend.scala      |  16 +-
 docs/upgrade.html                                  |  52 ++++
 24 files changed, 752 insertions(+), 200 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
index a436dff..7f43caf 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
@@ -26,6 +26,7 @@ public class CommonFields {
     public static final Field.Int32 PARTITION_ID = new 
Field.Int32("partition", "Topic partition id");
     public static final Field.Int16 ERROR_CODE = new Field.Int16("error_code", 
"Response error code");
     public static final Field.NullableStr ERROR_MESSAGE = new 
Field.NullableStr("error_message", "Response error message");
+    public static final Field.Int32 LEADER_EPOCH = new 
Field.Int32("leader_epoch", "The epoch");
 
     // Group APIs
     public static final Field.Str GROUP_ID = new Field.Str("group_id", "The 
unique group identifier");
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java 
b/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
index 0965e36..ce938aa 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
@@ -20,24 +20,29 @@ import org.apache.kafka.common.protocol.Errors;
 
 import static 
org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH;
 
+import java.util.Objects;
+
 /**
  * The offset, fetched from a leader, for a particular partition.
  */
 
 public class EpochEndOffset {
     public static final long UNDEFINED_EPOCH_OFFSET = 
NO_PARTITION_LEADER_EPOCH;
-    public static final int UNDEFINED_EPOCH = -1;
+    public static final int UNDEFINED_EPOCH = NO_PARTITION_LEADER_EPOCH;
 
     private Errors error;
+    private int leaderEpoch;  // introduced in V1
     private long endOffset;
 
-    public EpochEndOffset(Errors error, long endOffset) {
+    public EpochEndOffset(Errors error, int leaderEpoch, long endOffset) {
         this.error = error;
+        this.leaderEpoch = leaderEpoch;
         this.endOffset = endOffset;
     }
 
-    public EpochEndOffset(long endOffset) {
+    public EpochEndOffset(int leaderEpoch, long endOffset) {
         this.error = Errors.NONE;
+        this.leaderEpoch = leaderEpoch;
         this.endOffset = endOffset;
     }
 
@@ -53,10 +58,15 @@ public class EpochEndOffset {
         return endOffset;
     }
 
+    public int leaderEpoch() {
+        return leaderEpoch;
+    }
+
     @Override
     public String toString() {
         return "EpochEndOffset{" +
                 "error=" + error +
+                ", leaderEpoch=" + leaderEpoch +
                 ", endOffset=" + endOffset +
                 '}';
     }
@@ -68,14 +78,13 @@ public class EpochEndOffset {
 
         EpochEndOffset that = (EpochEndOffset) o;
 
-        if (error != that.error) return false;
-        return endOffset == that.endOffset;
+        return Objects.equals(error, that.error)
+               && Objects.equals(leaderEpoch, that.leaderEpoch)
+               && Objects.equals(endOffset, that.endOffset);
     }
 
     @Override
     public int hashCode() {
-        int result = (int) error.code();
-        result = 31 * result + (int) (endOffset ^ (endOffset >>> 32));
-        return result;
+        return Objects.hash(error, leaderEpoch, endOffset);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index d0585be..651416d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -50,8 +50,11 @@ public class OffsetsForLeaderEpochRequest extends 
AbstractRequest {
     private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V0 = new 
Schema(
             new Field(TOPICS_KEY_NAME, new 
ArrayOf(OFFSET_FOR_LEADER_EPOCH_REQUEST_TOPIC_V0), "An array of topics to get 
epochs for"));
 
+    /* v1 request is the same as v0. Per-partition leader epoch has been added 
to response */
+    private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V1 = 
OFFSET_FOR_LEADER_EPOCH_REQUEST_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{OFFSET_FOR_LEADER_EPOCH_REQUEST_V0};
+        return new Schema[]{OFFSET_FOR_LEADER_EPOCH_REQUEST_V0, 
OFFSET_FOR_LEADER_EPOCH_REQUEST_V1};
     }
 
     private Map<TopicPartition, Integer> epochsByPartition;
@@ -63,12 +66,12 @@ public class OffsetsForLeaderEpochRequest extends 
AbstractRequest {
     public static class Builder extends 
AbstractRequest.Builder<OffsetsForLeaderEpochRequest> {
         private Map<TopicPartition, Integer> epochsByPartition = new 
HashMap<>();
 
-        public Builder() {
-            super(ApiKeys.OFFSET_FOR_LEADER_EPOCH);
+        public Builder(short version) {
+            super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, version);
         }
 
-        public Builder(Map<TopicPartition, Integer> epochsByPartition) {
-            super(ApiKeys.OFFSET_FOR_LEADER_EPOCH);
+        public Builder(short version, Map<TopicPartition, Integer> 
epochsByPartition) {
+            super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, version);
             this.epochsByPartition = epochsByPartition;
         }
 
@@ -150,7 +153,8 @@ public class OffsetsForLeaderEpochRequest extends 
AbstractRequest {
         Errors error = Errors.forException(e);
         Map<TopicPartition, EpochEndOffset> errorResponse = new HashMap<>();
         for (TopicPartition tp : epochsByPartition.keySet()) {
-            errorResponse.put(tp, new EpochEndOffset(error, 
EpochEndOffset.UNDEFINED_EPOCH_OFFSET));
+            errorResponse.put(tp, new EpochEndOffset(
+                error, EpochEndOffset.UNDEFINED_EPOCH, 
EpochEndOffset.UNDEFINED_EPOCH_OFFSET));
         }
         return new OffsetsForLeaderEpochResponse(errorResponse);
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
index 4a91533..3da49b5 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
@@ -34,6 +35,7 @@ import java.util.Map;
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
 import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.CommonFields.LEADER_EPOCH;
 import static org.apache.kafka.common.protocol.types.Type.INT64;
 
 public class OffsetsForLeaderEpochResponse extends AbstractResponse {
@@ -52,8 +54,23 @@ public class OffsetsForLeaderEpochResponse extends 
AbstractResponse {
             new Field(TOPICS_KEY_NAME, new 
ArrayOf(OFFSET_FOR_LEADER_EPOCH_RESPONSE_TOPIC_V0),
                     "An array of topics for which we have leader offsets for 
some requested Partition Leader Epoch"));
 
+
+    // OFFSET_FOR_LEADER_EPOCH_RESPONSE_PARTITION_V1 added a per-partition 
leader epoch field,
+    // which specifies which leader epoch the end offset belongs to
+    private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_PARTITION_V1 
= new Schema(
+            ERROR_CODE,
+            PARTITION_ID,
+            LEADER_EPOCH,
+            new Field(END_OFFSET_KEY_NAME, INT64, "The end offset"));
+    private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_TOPIC_V1 = 
new Schema(
+            TOPIC_NAME,
+            new Field(PARTITIONS_KEY_NAME, new 
ArrayOf(OFFSET_FOR_LEADER_EPOCH_RESPONSE_PARTITION_V1)));
+    private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1 = new 
Schema(
+            new Field(TOPICS_KEY_NAME, new 
ArrayOf(OFFSET_FOR_LEADER_EPOCH_RESPONSE_TOPIC_V1),
+                  "An array of topics for which we have leader offsets for 
some requested Partition Leader Epoch"));
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0};
+        return new Schema[]{OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0, 
OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1};
     }
 
     private Map<TopicPartition, EpochEndOffset> epochEndOffsetsByPartition;
@@ -68,8 +85,9 @@ public class OffsetsForLeaderEpochResponse extends 
AbstractResponse {
                 Errors error = 
Errors.forCode(partitionAndEpoch.get(ERROR_CODE));
                 int partitionId = partitionAndEpoch.get(PARTITION_ID);
                 TopicPartition tp = new TopicPartition(topic, partitionId);
+                int leaderEpoch = partitionAndEpoch.getOrElse(LEADER_EPOCH, 
RecordBatch.NO_PARTITION_LEADER_EPOCH);
                 long endOffset = 
partitionAndEpoch.getLong(END_OFFSET_KEY_NAME);
-                epochEndOffsetsByPartition.put(tp, new EpochEndOffset(error, 
endOffset));
+                epochEndOffsetsByPartition.put(tp, new EpochEndOffset(error, 
leaderEpoch, endOffset));
             }
         }
     }
@@ -110,6 +128,7 @@ public class OffsetsForLeaderEpochResponse extends 
AbstractResponse {
                 Struct partitionStruct = 
topicStruct.instance(PARTITIONS_KEY_NAME);
                 partitionStruct.set(ERROR_CODE, 
partitionEndOffset.getValue().error().code());
                 partitionStruct.set(PARTITION_ID, partitionEndOffset.getKey());
+                partitionStruct.setIfExists(LEADER_EPOCH, 
partitionEndOffset.getValue().leaderEpoch());
                 partitionStruct.set(END_OFFSET_KEY_NAME, 
partitionEndOffset.getValue().endOffset());
                 partitions.add(partitionStruct);
             }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index c63cecd..2422650 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1020,15 +1020,15 @@ public class RequestResponseTest {
         epochs.put(new TopicPartition("topic1", 1), 1);
         epochs.put(new TopicPartition("topic2", 2), 3);
 
-        return new OffsetsForLeaderEpochRequest.Builder(epochs).build();
+        return new 
OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(),
 epochs).build();
     }
 
     private OffsetsForLeaderEpochResponse createLeaderEpochResponse() {
         Map<TopicPartition, EpochEndOffset> epochs = new HashMap<>();
 
-        epochs.put(new TopicPartition("topic1", 0), new 
EpochEndOffset(Errors.NONE, 0));
-        epochs.put(new TopicPartition("topic1", 1), new 
EpochEndOffset(Errors.NONE, 1));
-        epochs.put(new TopicPartition("topic2", 2), new 
EpochEndOffset(Errors.NONE, 2));
+        epochs.put(new TopicPartition("topic1", 0), new 
EpochEndOffset(Errors.NONE, 1, 0));
+        epochs.put(new TopicPartition("topic1", 1), new 
EpochEndOffset(Errors.NONE, 1, 1));
+        epochs.put(new TopicPartition("topic2", 2), new 
EpochEndOffset(Errors.NONE, 1, 2));
 
         return new OffsetsForLeaderEpochResponse(epochs);
     }
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala 
b/core/src/main/scala/kafka/api/ApiVersion.scala
index 62b91a0..ff011b2 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -77,6 +77,7 @@ object ApiVersion {
     // and KafkaStorageException for fetch requests.
     "1.1-IV0" -> KAFKA_1_1_IV0,
     "1.1" -> KAFKA_1_1_IV0,
+    // Introduced OffsetsForLeaderEpochRequest V1 via KIP-279
     "2.0-IV0" -> KAFKA_2_0_IV0,
     "2.0" -> KAFKA_2_0_IV0
   )
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 93377ba..b9180a4 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -647,15 +647,20 @@ class Partition(val topic: String,
 
   /**
     * @param leaderEpoch Requested leader epoch
-    * @return The last offset of messages published under this leader epoch.
+    * @return The requested leader epoch and the end offset of this leader 
epoch, or if the requested
+    *         leader epoch is unknown, the leader epoch less than the 
requested leader epoch and the end offset
+    *         of this leader epoch. The end offset of a leader epoch is 
defined as the start
+    *         offset of the first leader epoch larger than the leader epoch, 
or else the log end
+    *         offset if the leader epoch is the latest leader epoch.
     */
   def lastOffsetForLeaderEpoch(leaderEpoch: Int): EpochEndOffset = {
     inReadLock(leaderIsrUpdateLock) {
       leaderReplicaIfLocal match {
         case Some(leaderReplica) =>
-          new EpochEndOffset(NONE, 
leaderReplica.epochs.get.endOffsetFor(leaderEpoch))
+          val (epoch, offset) = 
leaderReplica.epochs.get.endOffsetFor(leaderEpoch)
+          new EpochEndOffset(NONE, epoch, offset)
         case None =>
-          new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET)
+          new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
       }
     }
   }
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala 
b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index ac83fa1..9426884 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -20,7 +20,7 @@ package kafka.consumer
 import kafka.api.{FetchRequestBuilder, FetchResponsePartitionData, 
OffsetRequest, Request}
 import kafka.cluster.BrokerEndPoint
 import kafka.message.ByteBufferMessageSet
-import kafka.server.{AbstractFetcherThread, PartitionFetchState}
+import kafka.server.{AbstractFetcherThread, PartitionFetchState, 
OffsetTruncationState}
 import AbstractFetcherThread.ResultWithPartitions
 import kafka.common.{ErrorMapping, TopicAndPartition}
 
@@ -129,7 +129,7 @@ class ConsumerFetcherThread(consumerIdString: String,
 
   override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): 
Map[TopicPartition, EpochEndOffset] = { Map() }
 
-  override def maybeTruncate(fetchedEpochs: Map[TopicPartition, 
EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = {
+  override def maybeTruncate(fetchedEpochs: Map[TopicPartition, 
EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, 
OffsetTruncationState]] = {
     ResultWithPartitions(Map(), Set())
   }
 }
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index f919ddf..f27dbfe 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -19,9 +19,10 @@ package kafka.server
 
 import java.util.concurrent.locks.ReentrantLock
 
-import kafka.cluster.BrokerEndPoint
+import kafka.cluster.{Replica, BrokerEndPoint}
 import kafka.utils.{DelayedItem, Pool, ShutdownableThread}
 import org.apache.kafka.common.errors.{CorruptRecordException, 
KafkaStorageException}
+import org.apache.kafka.common.requests.EpochEndOffset._
 import kafka.common.{ClientIdAndBroker, KafkaException}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.CoreUtils.inLock
@@ -39,6 +40,8 @@ import org.apache.kafka.common.internals.{FatalExitError, 
PartitionStates}
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.EpochEndOffset
 
+import scala.math._
+
 /**
  *  Abstract class for fetching data from multiple partitions from the same 
broker.
  */
@@ -76,7 +79,7 @@ abstract class AbstractFetcherThread(name: String,
 
   protected def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): 
Map[TopicPartition, EpochEndOffset]
 
-  protected def maybeTruncate(fetchedEpochs: Map[TopicPartition, 
EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]]
+  protected def maybeTruncate(fetchedEpochs: Map[TopicPartition, 
EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, 
OffsetTruncationState]]
 
   protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, 
PartitionFetchState)]): ResultWithPartitions[REQ]
 
@@ -132,7 +135,7 @@ abstract class AbstractFetcherThread(name: String,
         val leaderEpochs = fetchedEpochs.filter { case (tp, _) => 
partitionStates.contains(tp) }
         val ResultWithPartitions(fetchOffsets, partitionsWithError) = 
maybeTruncate(leaderEpochs)
         handlePartitionsWithErrors(partitionsWithError)
-        markTruncationCompleteAndUpdateFetchOffset(fetchOffsets)
+        updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
       }
     }
   }
@@ -272,15 +275,16 @@ abstract class AbstractFetcherThread(name: String,
   }
 
   /**
-    * Loop through all partitions, marking them as truncation complete and 
update the fetch offset
+    * Loop through all partitions, updating their fetch offset and maybe 
marking them as
+    * truncation completed if their offsetTruncationState indicates truncation 
completed
     *
-    * @param fetchOffsets the partitions to mark truncation complete
+    * @param fetchOffsets the partitions to update fetch offset and maybe mark 
truncation complete
     */
-  private def markTruncationCompleteAndUpdateFetchOffset(fetchOffsets: 
Map[TopicPartition, Long]) {
+  private def updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets: 
Map[TopicPartition, OffsetTruncationState]) {
     val newStates: Map[TopicPartition, PartitionFetchState] = 
partitionStates.partitionStates.asScala
       .map { state =>
         val maybeTruncationComplete = fetchOffsets.get(state.topicPartition()) 
match {
-          case Some(offset) => PartitionFetchState(offset, state.value.delay, 
truncatingLog = false)
+          case Some(offsetTruncationState) => 
PartitionFetchState(offsetTruncationState.offset, state.value.delay, 
truncatingLog = !offsetTruncationState.truncationCompleted)
           case None => state.value()
         }
         (state.topicPartition(), maybeTruncationComplete)
@@ -288,6 +292,79 @@ abstract class AbstractFetcherThread(name: String,
     partitionStates.set(newStates.asJava)
   }
 
+  /**
+   * Called from ReplicaFetcherThread and ReplicaAlterLogDirsThread 
maybeTruncate for each topic
+   * partition. Returns truncation offset and whether this is the final offset 
to truncate to
+   *
+   * For each topic partition, the offset to truncate to is calculated based 
on leader's returned
+   * epoch and offset:
+   *  -- If the leader replied with undefined epoch offset, we must use the 
high watermark. This can
+   *  happen if 1) the leader is still using message format older than 
KAFKA_0_11_0; 2) the follower
+   *  requested leader epoch < the first leader epoch known to the leader.
+   *  -- If the leader replied with the valid offset but undefined leader 
epoch, we truncate to
+   *  leader's offset if it is lower than follower's Log End Offset. This may 
happen if the
+   *  leader is on the inter-broker protocol version < KAFKA_2_0_IV0
+   *  -- If the leader replied with leader epoch not known to the follower, we 
truncate to the
+   *  end offset of the largest epoch that is smaller than the epoch the 
leader replied with, and
+   *  send OffsetsForLeaderEpochRequest with that leader epoch. In a more rare 
case, where the
+   *  follower was not tracking epochs smaller than the epoch the leader 
replied with, we
+   *  truncate the leader's offset (and do not send any more leader epoch 
requests).
+   *  -- Otherwise, truncate to min(leader's offset, end offset on the 
follower for epoch that
+   *  leader replied with, follower's Log End Offset).
+   *
+   * @param tp                    Topic partition
+   * @param leaderEpochOffset     Epoch end offset received from the leader 
for this topic partition
+   * @param replica               Follower's replica, which is either local 
replica
+   *                              (ReplicaFetcherThread) or future replica 
(ReplicaAlterLogDirsThread)
+   * @param isFutureReplica       true if called from ReplicaAlterLogDirsThread
+   */
+  def getOffsetTruncationState(tp: TopicPartition, leaderEpochOffset: 
EpochEndOffset, replica: Replica, isFutureReplica: Boolean = false): 
OffsetTruncationState = {
+    // to make sure we can distinguish log output for fetching from remote 
leader or local replica
+    val followerName = if (isFutureReplica) "future replica" else "follower"
+
+    if (leaderEpochOffset.endOffset == UNDEFINED_EPOCH_OFFSET) {
+      // truncate to initial offset which is the high watermark for follower 
replica. For
+      // future replica, it is either high watermark of the future replica or 
current
+      // replica's truncation offset (when the current replica truncates, it 
forces future
+      // replica's partition state to 'truncating' and sets initial offset to 
its truncation offset)
+      warn(s"Based on $followerName's leader epoch, leader replied with an 
unknown offset in ${replica.topicPartition}. " +
+           s"The initial fetch offset 
${partitionStates.stateValue(tp).fetchOffset} will be used for truncation.")
+      OffsetTruncationState(partitionStates.stateValue(tp).fetchOffset, 
truncationCompleted = true)
+    } else if (leaderEpochOffset.leaderEpoch == UNDEFINED_EPOCH) {
+      // either leader or follower or both use inter-broker protocol version < 
KAFKA_2_0_IV0
+      // (version 0 of OffsetForLeaderEpoch request/response)
+      warn(s"Leader or $followerName is on protocol version where leader epoch 
is not considered in the OffsetsForLeaderEpoch response. " +
+           s"The leader's offset ${leaderEpochOffset.endOffset} will be used 
for truncation in ${replica.topicPartition}.")
+      OffsetTruncationState(min(leaderEpochOffset.endOffset, 
replica.logEndOffset.messageOffset), truncationCompleted = true)
+    } else {
+      // get (leader epoch, end offset) pair that corresponds to the largest 
leader epoch
+      // less than or equal to the requested epoch.
+      val (followerEpoch, followerEndOffset) = 
replica.epochs.get.endOffsetFor(leaderEpochOffset.leaderEpoch)
+      if (followerEndOffset == UNDEFINED_EPOCH_OFFSET) {
+        // This can happen if the follower was not tracking leader epochs at 
that point (before the
+        // upgrade, or if this broker is new). Since the leader replied with 
epoch <
+        // requested epoch from follower, so should be safe to truncate to 
leader's
+        // offset (this is the same behavior as post-KIP-101 and pre-KIP-279)
+        warn(s"Based on $followerName's leader epoch, leader replied with 
epoch ${leaderEpochOffset.leaderEpoch} " +
+             s"below any $followerName's tracked epochs for 
${replica.topicPartition}. " +
+             s"The leader's offset only ${leaderEpochOffset.endOffset} will be 
used for truncation.")
+        OffsetTruncationState(min(leaderEpochOffset.endOffset, 
replica.logEndOffset.messageOffset), truncationCompleted = true)
+      } else if (followerEpoch != leaderEpochOffset.leaderEpoch) {
+        // the follower does not know about the epoch that leader replied with
+        // we truncate to the end offset of the largest epoch that is smaller 
than the
+        // epoch the leader replied with, and send another offset for leader 
epoch request
+        val intermediateOffsetToTruncateTo = min(followerEndOffset, 
replica.logEndOffset.messageOffset)
+        info(s"Based on $followerName's leader epoch, leader replied with 
epoch ${leaderEpochOffset.leaderEpoch} " +
+             s"unknown to the $followerName for ${replica.topicPartition}. " +
+             s"Will truncate to $intermediateOffsetToTruncateTo and send 
another leader epoch request to the leader.")
+        OffsetTruncationState(intermediateOffsetToTruncateTo, 
truncationCompleted = false)
+      } else {
+        val offsetToTruncateTo = min(followerEndOffset, 
leaderEpochOffset.endOffset)
+        OffsetTruncationState(min(offsetToTruncateTo, 
replica.logEndOffset.messageOffset), truncationCompleted = true)
+      }
+    }
+  }
+
   def delayPartitions(partitions: Iterable[TopicPartition], delay: Long) {
     partitionMapLock.lockInterruptibly()
     try {
@@ -446,3 +523,10 @@ case class PartitionFetchState(fetchOffset: Long, delay: 
DelayedItem, truncating
 
   override def toString = 
"offset:%d-isReadyForFetch:%b-isTruncatingLog:%b".format(fetchOffset, 
isReadyForFetch, truncatingLog)
 }
+
+case class OffsetTruncationState(offset: Long, truncationCompleted: Boolean) {
+
+  def this(offset: Long) = this(offset, true)
+
+  override def toString = "offset:%d-truncationCompleted:%b".format(offset, 
truncationCompleted)
+}
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala 
b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 0faf5dc..30e6c07 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -37,7 +37,6 @@ import org.apache.kafka.common.record.{FileRecords, 
MemoryRecords}
 import scala.collection.JavaConverters._
 import scala.collection.{Map, Seq, Set, mutable}
 
-
 class ReplicaAlterLogDirsThread(name: String,
                                 sourceBroker: BrokerEndPoint,
                                 brokerConfig: KafkaConfig,
@@ -102,7 +101,8 @@ class ReplicaAlterLogDirsThread(name: String,
 
     // Append the leader's messages to the log
     partition.appendRecordsToFutureReplica(records)
-    futureReplica.highWatermark = new 
LogOffsetMetadata(partitionData.highWatermark)
+    val futureReplicaHighWatermark = 
futureReplica.logEndOffset.messageOffset.min(partitionData.highWatermark)
+    futureReplica.highWatermark = new 
LogOffsetMetadata(futureReplicaHighWatermark)
     futureReplica.maybeIncrementLogStartOffset(partitionData.logStartOffset)
 
     if (partition.maybeReplaceCurrentWithFutureReplica())
@@ -164,17 +164,32 @@ class ReplicaAlterLogDirsThread(name: String,
   def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): 
Map[TopicPartition, EpochEndOffset] = {
     partitions.map { case (tp, epoch) =>
       try {
-        tp -> new EpochEndOffset(Errors.NONE, 
replicaMgr.getReplicaOrException(tp).epochs.get.endOffsetFor(epoch))
+        val (leaderEpoch, leaderOffset) = 
replicaMgr.getReplicaOrException(tp).epochs.get.endOffsetFor(epoch)
+        tp -> new EpochEndOffset(Errors.NONE, leaderEpoch, leaderOffset)
       } catch {
         case t: Throwable =>
           warn(s"Error when getting EpochEndOffset for $tp", t)
-          tp -> new EpochEndOffset(Errors.forException(t), 
UNDEFINED_EPOCH_OFFSET)
+          tp -> new EpochEndOffset(Errors.forException(t), UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
       }
     }
   }
 
-  def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): 
ResultWithPartitions[Map[TopicPartition, Long]] = {
-    val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, 
Long]
+  /**
+   * Truncate the log for each partition based on current replica's returned 
epoch and offset.
+   *
+   * The logic for finding the truncation offset is the same as in 
ReplicaFetcherThread
+   * and mainly implemented in AbstractFetcherThread.getOffsetTruncationState. 
One difference is
+   * that the initial fetch offset for topic partition could be set to the 
truncation offset of
+   * the current replica if that replica truncates. Otherwise, it is high 
watermark as in ReplicaFetcherThread.
+   *
+   * The reason we have to follow the leader epoch approach for truncating a 
future replica is to
+   * cover the case where a future replica is offline when the current replica 
truncates and
+   * re-replicates offsets that may have already been copied to the future 
replica. In that case,
+   * the future replica may miss "mark for truncation" event and must use the 
offset for leader epoch
+   * exchange with the current replica to truncate to the largest common log 
prefix for the topic partition
+   */
+  def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): 
ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = {
+    val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, 
OffsetTruncationState]
     val partitionsWithError = mutable.Set[TopicPartition]()
 
     fetchedEpochs.foreach { case (topicPartition, epochOffset) =>
@@ -186,16 +201,10 @@ class ReplicaAlterLogDirsThread(name: String,
           info(s"Retrying leaderEpoch request for partition $topicPartition as 
the current replica reported an error: ${epochOffset.error}")
           partitionsWithError += topicPartition
         } else {
-          val fetchOffset =
-            if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET)
-              partitionStates.stateValue(topicPartition).fetchOffset
-            else if (epochOffset.endOffset >= 
futureReplica.logEndOffset.messageOffset)
-              futureReplica.logEndOffset.messageOffset
-            else
-              epochOffset.endOffset
-
-          partition.truncateTo(fetchOffset, isFuture = true)
-          fetchOffsets.put(topicPartition, fetchOffset)
+          val offsetTruncationState = getOffsetTruncationState(topicPartition, 
epochOffset, futureReplica, isFutureReplica = true)
+
+          partition.truncateTo(offsetTruncationState.offset, isFuture = true)
+          fetchOffsets.put(topicPartition, offsetTruncationState)
         }
       } catch {
         case e: KafkaStorageException =>
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 8344d5b..6805d77 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -21,7 +21,7 @@ import java.util
 
 import AbstractFetcherThread.ResultWithPartitions
 import kafka.api.{FetchRequest => _, _}
-import kafka.cluster.{BrokerEndPoint, Replica}
+import kafka.cluster.BrokerEndPoint
 import kafka.log.LogConfig
 import kafka.server.ReplicaFetcherThread._
 import kafka.server.epoch.LeaderEpochCache
@@ -74,6 +74,9 @@ class ReplicaFetcherThread(name: String,
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
     else 0
+  private val offsetForLeaderEpochRequestVersion: Short =
+    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV0) 1
+    else 0
   private val fetchMetadataSupported = brokerConfig.interBrokerProtocolVersion 
>= KAFKA_1_1_IV0
   private val maxWait = brokerConfig.replicaFetchWaitMaxMs
   private val minBytes = brokerConfig.replicaFetchMinBytes
@@ -286,37 +289,31 @@ class ReplicaFetcherThread(name: String,
   }
 
   /**
-    * - Truncate the log to the leader's offset for each partition's epoch.
-    * - If the leader's offset is greater, we stick with the Log End Offset
-    *   otherwise we truncate to the leaders offset.
-    * - If the leader replied with undefined epoch offset we must use the high 
watermark
-    */
-  override def maybeTruncate(fetchedEpochs: Map[TopicPartition, 
EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = {
-    val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, 
Long]
+   * Truncate the log for each partition's epoch based on leader's returned 
epoch and offset.
+   * The logic for finding the truncation offset is implemented in 
AbstractFetcherThread.getOffsetTruncationState
+   */
+  override def maybeTruncate(fetchedEpochs: Map[TopicPartition, 
EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, 
OffsetTruncationState]] = {
+    val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, 
OffsetTruncationState]
     val partitionsWithError = mutable.Set[TopicPartition]()
 
-    fetchedEpochs.foreach { case (tp, epochOffset) =>
+    fetchedEpochs.foreach { case (tp, leaderEpochOffset) =>
       try {
         val replica = replicaMgr.getReplicaOrException(tp)
         val partition = replicaMgr.getPartition(tp).get
 
-        if (epochOffset.hasError) {
-          info(s"Retrying leaderEpoch request for partition 
${replica.topicPartition} as the leader reported an error: 
${epochOffset.error}")
+        if (leaderEpochOffset.hasError) {
+          info(s"Retrying leaderEpoch request for partition 
${replica.topicPartition} as the leader reported an error: 
${leaderEpochOffset.error}")
           partitionsWithError += tp
         } else {
-          val fetchOffset =
-            if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET) {
-              warn(s"Based on follower's leader epoch, leader replied with an 
unknown offset in ${replica.topicPartition}. " +
-                s"The initial fetch offset 
${partitionStates.stateValue(tp).fetchOffset} will be used for truncation.")
-              partitionStates.stateValue(tp).fetchOffset
-            } else if (epochOffset.endOffset >= 
replica.logEndOffset.messageOffset)
-              logEndOffset(replica, epochOffset)
-            else
-              epochOffset.endOffset
-
-          partition.truncateTo(fetchOffset, isFuture = false)
-          
replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId,
 tp, fetchOffset)
-          fetchOffsets.put(tp, fetchOffset)
+          val offsetTruncationState = getOffsetTruncationState(tp, 
leaderEpochOffset, replica)
+          if (offsetTruncationState.offset < 
replica.highWatermark.messageOffset)
+            warn(s"Truncating $tp to offset ${offsetTruncationState.offset} 
below high watermark ${replica.highWatermark.messageOffset}")
+
+          partition.truncateTo(offsetTruncationState.offset, isFuture = false)
+          // mark the future replica for truncation only when we do last 
truncation
+          if (offsetTruncationState.truncationCompleted)
+            
replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId,
 tp, offsetTruncationState.offset)
+          fetchOffsets.put(tp, offsetTruncationState)
         }
       } catch {
         case e: KafkaStorageException =>
@@ -344,7 +341,7 @@ class ReplicaFetcherThread(name: String,
     var result: Map[TopicPartition, EpochEndOffset] = null
     if (shouldSendLeaderEpochRequest) {
       val partitionsAsJava = partitions.map { case (tp, epoch) => tp -> 
epoch.asInstanceOf[Integer] }.toMap.asJava
-      val epochRequest = new 
OffsetsForLeaderEpochRequest.Builder(partitionsAsJava)
+      val epochRequest = new 
OffsetsForLeaderEpochRequest.Builder(offsetForLeaderEpochRequestVersion, 
partitionsAsJava)
       try {
         val response = leaderEndpoint.sendRequest(epochRequest)
         result = 
response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala
@@ -355,26 +352,19 @@ class ReplicaFetcherThread(name: String,
 
           // if we get any unexpected exception, mark all partitions with an 
error
           result = partitions.map { case (tp, _) =>
-            tp -> new EpochEndOffset(Errors.forException(t), 
UNDEFINED_EPOCH_OFFSET)
+            tp -> new EpochEndOffset(Errors.forException(t), UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
           }
       }
     } else {
       // just generate a response with no error but UNDEFINED_OFFSET so that 
we can fall back to truncating using
       // high watermark in maybeTruncate()
       result = partitions.map { case (tp, _) =>
-        tp -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH_OFFSET)
+        tp -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
       }
     }
     result
   }
 
-  private def logEndOffset(replica: Replica, epochOffset: EpochEndOffset): 
Long = {
-    val logEndOffset = replica.logEndOffset.messageOffset
-    info(s"Based on follower's leader epoch, leader replied with an offset 
${epochOffset.endOffset} >= the " +
-      s"follower's log end offset $logEndOffset in ${replica.topicPartition}. 
No truncation needed.")
-    logEndOffset
-  }
-
   /**
    *  To avoid ISR thrashing, we only throttle a replica on the follower if 
it's in the throttled replica list,
    *  the quota is exceeded and the replica is not in sync.
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 85c66a5..309a599 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1475,11 +1475,11 @@ class ReplicaManager(val config: KafkaConfig,
       val epochEndOffset = getPartition(tp) match {
         case Some(partition) =>
           if (partition eq ReplicaManager.OfflinePartition)
-            new EpochEndOffset(KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH_OFFSET)
+            new EpochEndOffset(KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
           else
             partition.lastOffsetForLeaderEpoch(leaderEpoch)
         case None =>
-          new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, 
UNDEFINED_EPOCH_OFFSET)
+          new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
       }
       tp -> epochEndOffset
     }
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala 
b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index 220432d..23a5305 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -20,7 +20,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import kafka.server.LogOffsetMetadata
 import kafka.server.checkpoints.LeaderEpochCheckpoint
-import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET}
+import org.apache.kafka.common.requests.EpochEndOffset._
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
@@ -29,7 +29,7 @@ import scala.collection.mutable.ListBuffer
 trait LeaderEpochCache {
   def assign(leaderEpoch: Int, offset: Long)
   def latestEpoch(): Int
-  def endOffsetFor(epoch: Int): Long
+  def endOffsetFor(epoch: Int): (Int, Long)
   def clearAndFlushLatest(offset: Long)
   def clearAndFlushEarliest(offset: Long)
   def clearAndFlush()
@@ -81,36 +81,42 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, 
leo: () => LogOffsetM
   }
 
   /**
-    * Returns the End Offset for a requested Leader Epoch.
+    * Returns the Leader Epoch and the End Offset for a requested Leader Epoch.
     *
-    * This is defined as the start offset of the first Leader Epoch larger 
than the
-    * Leader Epoch requested, or else the Log End Offset if the latest epoch 
was requested.
+    * The Leader Epoch returned is the largest epoch less than or equal to the 
requested Leader
+    * Epoch. The End Offset is the end offset of this epoch, which is defined 
as the start offset
+    * of the first Leader Epoch larger than the Leader Epoch requested, or 
else the Log End
+    * Offset if the latest epoch was requested.
     *
     * During the upgrade phase, where there are existing messages may not have 
a leader epoch,
     * if requestedEpoch is < the first epoch cached, UNSUPPORTED_EPOCH_OFFSET 
will be returned
     * so that the follower falls back to High Water Mark.
     *
-    * @param requestedEpoch
-    * @return offset
+    * @param requestedEpoch requested leader epoch
+    * @return leader epoch and offset
     */
-  override def endOffsetFor(requestedEpoch: Int): Long = {
+  override def endOffsetFor(requestedEpoch: Int): (Int, Long) = {
     inReadLock(lock) {
-      val offset =
+      val epochAndOffset =
         if (requestedEpoch == UNDEFINED_EPOCH) {
           // this may happen if a bootstrapping follower sends a request with 
undefined epoch or
           // a follower is on the older message format where leader epochs are 
not recorded
-          UNDEFINED_EPOCH_OFFSET
+          (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
         } else if (requestedEpoch == latestEpoch) {
-          leo().messageOffset
+          (requestedEpoch, leo().messageOffset)
         } else {
-          val subsequentEpochs = epochs.filter(e => e.epoch > requestedEpoch)
+          val (subsequentEpochs, previousEpochs) = epochs.partition { e => 
e.epoch > requestedEpoch}
           if (subsequentEpochs.isEmpty || requestedEpoch < epochs.head.epoch)
-            UNDEFINED_EPOCH_OFFSET
-          else
-            subsequentEpochs.head.startOffset
+            // no epochs recorded or requested epoch < the first epoch cached
+            (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
+          else {
+            // we must get at least one element in previous epochs list, 
because if we are here,
+            // it means that requestedEpoch >= epochs.head.epoch -- so at 
least the first epoch is
+            (previousEpochs.last.epoch, subsequentEpochs.head.startOffset)
+          }
         }
-      debug(s"Processed offset for epoch request for partition 
${topicPartition} epoch:$requestedEpoch and returning offset $offset from epoch 
list of size ${epochs.size}")
-      offset
+      debug(s"Processed offset for epoch request for partition 
${topicPartition} epoch:$requestedEpoch and returning epoch 
${epochAndOffset._1} and offset ${epochAndOffset._2} from epoch list of size 
${epochs.size}")
+      epochAndOffset
     }
   }
 
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index ab7ca64..da45be2 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -290,7 +290,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def offsetsForLeaderEpochRequest = {
-    new OffsetsForLeaderEpochRequest.Builder().add(tp, 7).build()
+    new 
OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion()).add(tp,
 7).build()
   }
 
   private def createOffsetFetchRequest = {
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index b95f66c..bf6db2f 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -21,6 +21,7 @@ import AbstractFetcherThread._
 import com.yammer.metrics.Metrics
 import kafka.cluster.BrokerEndPoint
 import kafka.server.AbstractFetcherThread.{FetchRequest, PartitionData}
+import kafka.server.OffsetTruncationState
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
@@ -131,7 +132,7 @@ class AbstractFetcherThreadTest {
 
     override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): 
Map[TopicPartition, EpochEndOffset] = { Map() }
 
-    override def maybeTruncate(fetchedEpochs: Map[TopicPartition, 
EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = {
+    override def maybeTruncate(fetchedEpochs: Map[TopicPartition, 
EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, 
OffsetTruncationState]] = {
       ResultWithPartitions(Map(), Set())
     }
   }
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index a0f1dae..c7a07ec 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -61,7 +61,7 @@ class ReplicaAlterLogDirsThreadTest {
 
     //Stubs
     expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
-    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn(leo).anyTimes()
+    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, 
leo)).anyTimes()
     stub(replica, replica, futureReplica, partition, replicaManager)
 
     replay(leaderEpochs, replicaManager, replica)
@@ -78,8 +78,8 @@ class ReplicaAlterLogDirsThreadTest {
     val result = thread.fetchEpochsFromLeader(Map(t1p0 -> leaderEpoch, t1p1 -> 
leaderEpoch))
 
     val expected = Map(
-      t1p0 -> new EpochEndOffset(Errors.NONE, leo),
-      t1p1 -> new EpochEndOffset(Errors.NONE, leo)
+      t1p0 -> new EpochEndOffset(Errors.NONE, leaderEpoch, leo),
+      t1p1 -> new EpochEndOffset(Errors.NONE, leaderEpoch, leo)
     )
 
     assertEquals("results from leader epoch request should have offset from 
local replica",
@@ -101,7 +101,7 @@ class ReplicaAlterLogDirsThreadTest {
 
     //Stubs
     expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
-    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn(leo).anyTimes()
+    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, 
leo)).anyTimes()
     
expect(replicaManager.getReplicaOrException(t1p0)).andReturn(replica).anyTimes()
     
expect(replicaManager.getPartition(t1p0)).andReturn(Some(partition)).anyTimes()
     expect(replicaManager.getReplicaOrException(t1p1)).andThrow(new 
KafkaStorageException).once()
@@ -121,8 +121,8 @@ class ReplicaAlterLogDirsThreadTest {
     val result = thread.fetchEpochsFromLeader(Map(t1p0 -> leaderEpoch, t1p1 -> 
leaderEpoch))
 
     val expected = Map(
-      t1p0 -> new EpochEndOffset(Errors.NONE, leo),
-      t1p1 -> new EpochEndOffset(Errors.KAFKA_STORAGE_ERROR, 
UNDEFINED_EPOCH_OFFSET)
+      t1p0 -> new EpochEndOffset(Errors.NONE, leaderEpoch, leo),
+      t1p1 -> new EpochEndOffset(Errors.KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
     )
 
     assertEquals(expected, result)
@@ -161,8 +161,10 @@ class ReplicaAlterLogDirsThreadTest {
     
expect(futureReplica.epochs).andReturn(Some(futureReplicaLeaderEpochs)).anyTimes()
     expect(futureReplica.logEndOffset).andReturn(new 
LogOffsetMetadata(futureReplicaLEO)).anyTimes()
     
expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(leaderEpoch).anyTimes()
-    
expect(leaderEpochsT1p0.endOffsetFor(leaderEpoch)).andReturn(replicaT1p0LEO).anyTimes()
-    
expect(leaderEpochsT1p1.endOffsetFor(leaderEpoch)).andReturn(replicaT1p1LEO).anyTimes()
+    expect(leaderEpochsT1p0.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, 
replicaT1p0LEO)).anyTimes()
+    expect(leaderEpochsT1p1.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, 
replicaT1p1LEO)).anyTimes()
+    
expect(futureReplicaLeaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch,
 futureReplicaLEO)).anyTimes()
+
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     stubWithFetchMessages(replicaT1p0, replicaT1p1, futureReplica, partition, 
replicaManager, responseCallback)
 
@@ -189,6 +191,73 @@ class ReplicaAlterLogDirsThreadTest {
   }
 
   @Test
+  def shouldTruncateToEndOffsetOfLargestCommonEpoch(): Unit = {
+
+    //Create a capture to track what partitions/offsets are truncated
+    val truncateToCapture: Capture[Long] = newCapture(CaptureType.ALL)
+
+    // Setup all the dependencies
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
+    val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs = createMock(classOf[LeaderEpochCache])
+    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
+    val logManager = createMock(classOf[LogManager])
+    val replica = createNiceMock(classOf[Replica])
+    // one future replica mock because our mocking methods return same values 
for both future replicas
+    val futureReplica = createNiceMock(classOf[Replica])
+    val partition = createMock(classOf[Partition])
+    val replicaManager = createMock(classOf[ReplicaManager])
+    val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => 
Unit]  = EasyMock.newCapture()
+
+    val leaderEpoch = 5
+    val futureReplicaLEO = 195
+    val replicaLEO = 200
+    val replicaEpochEndOffset = 190
+    val futureReplicaEpochEndOffset = 191
+
+    //Stubs
+    expect(partition.truncateTo(capture(truncateToCapture), 
anyBoolean())).anyTimes()
+    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+    
expect(futureReplica.epochs).andReturn(Some(futureReplicaLeaderEpochs)).anyTimes()
+    expect(futureReplica.logEndOffset).andReturn(new 
LogOffsetMetadata(futureReplicaLEO)).anyTimes()
+    expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(leaderEpoch).once()
+    expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(leaderEpoch - 
2).once()
+
+    // leader replica truncated and fetched new offsets with new leader epoch
+    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch - 1, 
replicaLEO)).anyTimes()
+    // but future replica does not know about this leader epoch, so returns a 
smaller leader epoch
+    expect(futureReplicaLeaderEpochs.endOffsetFor(leaderEpoch - 
1)).andReturn((leaderEpoch - 2, futureReplicaLEO)).anyTimes()
+    // finally, the leader replica knows about the leader epoch and returns 
end offset
+    expect(leaderEpochs.endOffsetFor(leaderEpoch - 2)).andReturn((leaderEpoch 
- 2, replicaEpochEndOffset)).anyTimes()
+    expect(futureReplicaLeaderEpochs.endOffsetFor(leaderEpoch - 
2)).andReturn((leaderEpoch - 2, futureReplicaEpochEndOffset)).anyTimes()
+
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    stubWithFetchMessages(replica, replica, futureReplica, partition, 
replicaManager, responseCallback)
+
+    replay(leaderEpochs, futureReplicaLeaderEpochs, replicaManager, 
logManager, quotaManager, replica, futureReplica, partition)
+
+    //Create the thread
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val thread = new ReplicaAlterLogDirsThread(
+      "alter-logs-dirs-thread-test1",
+      sourceBroker = endPoint,
+      brokerConfig = config,
+      replicaMgr = replicaManager,
+      quota = quotaManager,
+      brokerTopicStats = null)
+    thread.addPartitions(Map(t1p0 -> 0))
+
+    // First run will result in another offset for leader epoch request
+    thread.doWork()
+    // Second run should actually truncate
+    thread.doWork()
+
+    //We should have truncated to the offsets in the response
+    assertTrue("Expected offset " + replicaEpochEndOffset + " in captured 
truncation offsets " + truncateToCapture.getValues,
+               
truncateToCapture.getValues.asScala.contains(replicaEpochEndOffset))
+  }
+
+  @Test
   def shouldTruncateToInitialFetchOffsetIfReplicaReturnsUndefinedOffset(): 
Unit = {
 
     //Create a capture to track what partitions/offsets are truncated
@@ -219,9 +288,9 @@ class ReplicaAlterLogDirsThreadTest {
     // pretend this is a completely new future replica, with no leader epochs 
recorded
     
expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(UNDEFINED_EPOCH).anyTimes()
 
-    // since UNDEFINED_EPOCH is -1 wich will be lower than any valid leader 
epoch, the method
+    // since UNDEFINED_EPOCH is -1 which will be lower than any valid leader 
epoch, the method
     // will return UNDEFINED_EPOCH_OFFSET if requested epoch is lower than the 
first epoch cached
-    
expect(leaderEpochs.endOffsetFor(UNDEFINED_EPOCH)).andReturn(UNDEFINED_EPOCH_OFFSET).anyTimes()
+    
expect(leaderEpochs.endOffsetFor(UNDEFINED_EPOCH)).andReturn((UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)).anyTimes()
     stubWithFetchMessages(replica, replica, futureReplica, partition, 
replicaManager, responseCallback)
     replay(replicaManager, logManager, quotaManager, leaderEpochs, 
futureReplicaLeaderEpochs,
            replica, futureReplica, partition)
@@ -273,7 +342,8 @@ class ReplicaAlterLogDirsThreadTest {
     
expect(futureReplica.epochs).andReturn(Some(futureReplicaLeaderEpochs)).anyTimes()
 
     
expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(futureReplicaLeaderEpoch).anyTimes()
-    
expect(leaderEpochs.endOffsetFor(futureReplicaLeaderEpoch)).andReturn(replicaLEO).anyTimes()
+    
expect(leaderEpochs.endOffsetFor(futureReplicaLeaderEpoch)).andReturn((futureReplicaLeaderEpoch,
 replicaLEO)).anyTimes()
+    
expect(futureReplicaLeaderEpochs.endOffsetFor(futureReplicaLeaderEpoch)).andReturn((futureReplicaLeaderEpoch,
 futureReplicaLEO)).anyTimes()
 
     expect(futureReplica.logEndOffset).andReturn(new 
LogOffsetMetadata(futureReplicaLEO)).anyTimes()
     expect(replicaManager.getReplica(t1p0)).andReturn(Some(replica)).anyTimes()
@@ -355,7 +425,8 @@ class ReplicaAlterLogDirsThreadTest {
 
     expect(futureReplica.logEndOffset).andReturn(new 
LogOffsetMetadata(futureReplicaLEO)).anyTimes()
     expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(leaderEpoch)
-    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn(replicaLEO)
+    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, 
replicaLEO))
+    
expect(futureReplicaLeaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch,
 futureReplicaLEO))
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     stubWithFetchMessages(replica, replica, futureReplica, partition, 
replicaManager, responseCallback)
 
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 2074044..f8f4948 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -20,7 +20,6 @@ import kafka.cluster.{BrokerEndPoint, Replica}
 import kafka.log.LogManager
 import kafka.cluster.Partition
 import kafka.server.epoch.LeaderEpochCache
-import org.apache.kafka.common.requests.EpochEndOffset._
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
@@ -28,6 +27,7 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.Errors._
 import org.apache.kafka.common.requests.EpochEndOffset
+import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.utils.SystemTime
 import org.easymock.EasyMock._
 import org.easymock.{Capture, CaptureType}
@@ -43,17 +43,18 @@ class ReplicaFetcherThreadTest {
   private val t1p1 = new TopicPartition("topic1", 1)
   private val t2p1 = new TopicPartition("topic2", 1)
 
+  private val brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000)
+
   @Test
   def shouldNotIssueLeaderEpochRequestIfInterbrokerVersionBelow11(): Unit = {
     val props = TestUtils.createBrokerConfig(1, "localhost:1234")
     props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.10.2")
     props.put(KafkaConfig.LogMessageFormatVersionProp, "0.10.2")
     val config = KafkaConfig.fromProps(props)
-    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
     val thread = new ReplicaFetcherThread(
       name = "bob",
       fetcherId = 0,
-      sourceBroker = endPoint,
+      sourceBroker = brokerEndPoint,
       brokerConfig = config,
       replicaMgr = null,
       metrics =  new Metrics(),
@@ -64,8 +65,8 @@ class ReplicaFetcherThreadTest {
     val result = thread.fetchEpochsFromLeader(Map(t1p0 -> 0, t1p1 -> 0))
 
     val expected = Map(
-      t1p0 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH_OFFSET),
-      t1p1 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH_OFFSET)
+      t1p0 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET),
+      t1p1 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
     )
 
     assertEquals("results from leader epoch request should have undefined 
offset", expected, result)
@@ -75,7 +76,6 @@ class ReplicaFetcherThreadTest {
   def shouldHandleExceptionFromBlockingSend(): Unit = {
     val props = TestUtils.createBrokerConfig(1, "localhost:1234")
     val config = KafkaConfig.fromProps(props)
-    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
     val mockBlockingSend = createMock(classOf[BlockingSend])
 
     expect(mockBlockingSend.sendRequest(anyObject())).andThrow(new 
NullPointerException).once()
@@ -84,7 +84,7 @@ class ReplicaFetcherThreadTest {
     val thread = new ReplicaFetcherThread(
       name = "bob",
       fetcherId = 0,
-      sourceBroker = endPoint,
+      sourceBroker = brokerEndPoint,
       brokerConfig = config,
       replicaMgr = null,
       metrics =  new Metrics(),
@@ -95,8 +95,8 @@ class ReplicaFetcherThreadTest {
     val result = thread.fetchEpochsFromLeader(Map(t1p0 -> 0, t1p1 -> 0))
 
     val expected = Map(
-      t1p0 -> new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, 
UNDEFINED_EPOCH_OFFSET),
-      t1p1 -> new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, 
UNDEFINED_EPOCH_OFFSET)
+      t1p0 -> new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET),
+      t1p1 -> new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
     )
 
     assertEquals("results from leader epoch request should have undefined 
offset", expected, result)
@@ -104,7 +104,7 @@ class ReplicaFetcherThreadTest {
   }
 
   @Test
-  def shouldFetchLeaderEpochOnFirstFetchOnly(): Unit = {
+  def shouldFetchLeaderEpochOnFirstFetchOnlyIfLeaderEpochKnownToBoth(): Unit = 
{
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
 
     //Setup all dependencies
@@ -116,27 +116,29 @@ class ReplicaFetcherThreadTest {
     val partition = createMock(classOf[Partition])
     val replicaManager = createMock(classOf[ReplicaManager])
 
+    val leaderEpoch = 5
+
     //Stubs
     expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
     expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes()
-    expect(leaderEpochs.latestEpoch).andReturn(5)
+    expect(replica.highWatermark).andReturn(new 
LogOffsetMetadata(0)).anyTimes()
+    expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch)
+    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, 
0)).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
     stub(replica, partition, replicaManager)
 
-
     //Expectations
     expect(partition.truncateTo(anyLong(), anyBoolean())).once
 
     replay(leaderEpochs, replicaManager, logManager, quota, replica)
 
     //Define the offsets for the OffsetsForLeaderEpochResponse
-    val offsets = Map(t1p0 -> new EpochEndOffset(1), t1p1 -> new 
EpochEndOffset(1)).asJava
+    val offsets = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 1), t1p1 -> new 
EpochEndOffset(leaderEpoch, 1)).asJava
 
     //Create the fetcher thread
-    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
-    val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, endPoint, 
new SystemTime())
-    val thread = new ReplicaFetcherThread("bob", 0, endPoint, config, 
replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+    val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, 
brokerEndPoint, new SystemTime())
+    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, 
replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
     thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0))
 
     //Loop 1
@@ -174,13 +176,16 @@ class ReplicaFetcherThreadTest {
     val partition = createMock(classOf[Partition])
     val replicaManager = createMock(classOf[ReplicaManager])
 
+    val leaderEpoch = 5
     val initialLEO = 200
 
     //Stubs
     expect(partition.truncateTo(capture(truncateToCapture), 
anyBoolean())).anyTimes()
     expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
     expect(replica.logEndOffset).andReturn(new 
LogOffsetMetadata(initialLEO)).anyTimes()
-    expect(leaderEpochs.latestEpoch).andReturn(5).anyTimes()
+    expect(replica.highWatermark).andReturn(new LogOffsetMetadata(initialLEO - 
1)).anyTimes()
+    expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch).anyTimes()
+    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, 
initialLEO)).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
     stub(replica, partition, replicaManager)
@@ -188,20 +193,208 @@ class ReplicaFetcherThreadTest {
     replay(leaderEpochs, replicaManager, logManager, quota, replica, partition)
 
     //Define the offsets for the OffsetsForLeaderEpochResponse, these are used 
for truncation
-    val offsetsReply = Map(t1p0 -> new EpochEndOffset(156), t2p1 -> new 
EpochEndOffset(172)).asJava
+    val offsetsReply = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 156), t2p1 
-> new EpochEndOffset(leaderEpoch, 172)).asJava
 
     //Create the thread
-    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
-    val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, 
endPoint, new SystemTime())
-    val thread = new ReplicaFetcherThread("bob", 0, endPoint, configs(0), 
replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+    val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, 
brokerEndPoint, new SystemTime())
+    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, 
configs(0), replicaManager, new Metrics(), new SystemTime(), quota, 
Some(mockNetwork))
     thread.addPartitions(Map(t1p0 -> 0, t2p1 -> 0))
 
     //Run it
     thread.doWork()
 
     //We should have truncated to the offsets in the response
-    assertTrue(truncateToCapture.getValues.asScala.contains(156))
-    assertTrue(truncateToCapture.getValues.asScala.contains(172))
+    assertTrue("Expected " + t1p0 + " to truncate to offset 156 (truncation 
offsets: " + truncateToCapture.getValues + ")",
+               truncateToCapture.getValues.asScala.contains(156))
+    assertTrue("Expected " + t2p1 + " to truncate to offset 172 (truncation 
offsets: " + truncateToCapture.getValues + ")",
+               truncateToCapture.getValues.asScala.contains(172))
+  }
+
+  @Test
+  def 
shouldTruncateToOffsetSpecifiedInEpochOffsetResponseIfFollowerHasNoMoreEpochs():
 Unit = {
+    // Create a capture to track what partitions/offsets are truncated
+    val truncateToCapture: Capture[Long] = newCapture(CaptureType.ALL)
+
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
+
+    // Setup all the dependencies
+    val configs = TestUtils.createBrokerConfigs(1, 
"localhost:1234").map(KafkaConfig.fromProps)
+    val quota = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs = createMock(classOf[LeaderEpochCache])
+    val logManager = createMock(classOf[LogManager])
+    val replicaAlterLogDirsManager = 
createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica = createNiceMock(classOf[Replica])
+    val partition = createMock(classOf[Partition])
+    val replicaManager = createMock(classOf[ReplicaManager])
+
+    val leaderEpochAtFollower = 5
+    val leaderEpochAtLeader = 4
+    val initialLEO = 200
+
+    //Stubs
+    expect(partition.truncateTo(capture(truncateToCapture), 
anyBoolean())).anyTimes()
+    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+    expect(replica.logEndOffset).andReturn(new 
LogOffsetMetadata(initialLEO)).anyTimes()
+    expect(replica.highWatermark).andReturn(new LogOffsetMetadata(initialLEO - 
3)).anyTimes()
+    
expect(leaderEpochs.latestEpoch).andReturn(leaderEpochAtFollower).anyTimes()
+    
expect(leaderEpochs.endOffsetFor(leaderEpochAtLeader)).andReturn((UNDEFINED_EPOCH,
 UNDEFINED_EPOCH_OFFSET)).anyTimes()
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
+    stub(replica, partition, replicaManager)
+
+    replay(leaderEpochs, replicaManager, logManager, quota, replica, partition)
+
+    //Define the offsets for the OffsetsForLeaderEpochResponse, these are used 
for truncation
+    val offsetsReply = Map(t1p0 -> new EpochEndOffset(leaderEpochAtLeader, 
156),
+                           t2p1 -> new EpochEndOffset(leaderEpochAtLeader, 
202)).asJava
+
+    //Create the thread
+    val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, 
brokerEndPoint, new SystemTime())
+    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, 
configs(0), replicaManager, new Metrics(), new SystemTime(), quota, 
Some(mockNetwork))
+    thread.addPartitions(Map(t1p0 -> 0, t2p1 -> 0))
+
+    //Run it
+    thread.doWork()
+
+    //We should have truncated to the offsets in the response
+    assertTrue("Expected " + t1p0 + " to truncate to offset 156 (truncation 
offsets: " + truncateToCapture.getValues + ")",
+               truncateToCapture.getValues.asScala.contains(156))
+    assertTrue("Expected " + t2p1 + " to truncate to offset " + initialLEO +
+               " (truncation offsets: " + truncateToCapture.getValues + ")",
+               truncateToCapture.getValues.asScala.contains(initialLEO))
+  }
+
+  @Test
+  def 
shouldFetchLeaderEpochSecondTimeIfLeaderRepliesWithEpochNotKnownToFollower(): 
Unit = {
+
+    // Create a capture to track what partitions/offsets are truncated
+    val truncateToCapture: Capture[Long] = newCapture(CaptureType.ALL)
+
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
+
+    // Setup all dependencies
+    val quota = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val logManager = createMock(classOf[LogManager])
+    val replicaAlterLogDirsManager = 
createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica = createNiceMock(classOf[Replica])
+    val partition = createMock(classOf[Partition])
+    val replicaManager = createMock(classOf[ReplicaManager])
+
+    val initialLEO = 200
+
+    // Stubs
+    expect(partition.truncateTo(capture(truncateToCapture), 
anyBoolean())).anyTimes()
+    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+    expect(replica.logEndOffset).andReturn(new 
LogOffsetMetadata(initialLEO)).anyTimes()
+    expect(replica.highWatermark).andReturn(new LogOffsetMetadata(initialLEO - 
2)).anyTimes()
+    expect(leaderEpochs.latestEpoch).andReturn(5)
+    expect(leaderEpochs.endOffsetFor(4)).andReturn((3, 120)).anyTimes()
+    expect(leaderEpochs.endOffsetFor(3)).andReturn((3, 120)).anyTimes()
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
+    stub(replica, partition, replicaManager)
+
+    replay(leaderEpochs, replicaManager, logManager, quota, replica, partition)
+
+    // Define the offsets for the OffsetsForLeaderEpochResponse
+    val offsets = Map(t1p0 -> new EpochEndOffset(4, 155), t1p1 -> new 
EpochEndOffset(4, 143)).asJava
+
+    // Create the fetcher thread
+    val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, 
brokerEndPoint, new SystemTime())
+    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, 
replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+    thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0))
+
+    // Loop 1 -- both topic partitions will need to fetch another leader epoch
+    thread.doWork()
+    assertEquals(1, mockNetwork.epochFetchCount)
+    assertEquals(0, mockNetwork.fetchCount)
+
+    // Loop 2 should do the second fetch for both topic partitions because the 
leader replied with
+    // epoch 4 while follower knows only about epoch 3
+    val nextOffsets = Map(t1p0 -> new EpochEndOffset(3, 101), t1p1 -> new 
EpochEndOffset(3, 102)).asJava
+    mockNetwork.setOffsetsForNextResponse(nextOffsets)
+    thread.doWork()
+    assertEquals(2, mockNetwork.epochFetchCount)
+    assertEquals(1, mockNetwork.fetchCount)
+    assertEquals("OffsetsForLeaderEpochRequest version.",
+                 1, mockNetwork.lastUsedOffsetForLeaderEpochVersion)
+
+    //Loop 3 we should not fetch epochs
+    thread.doWork()
+    assertEquals(2, mockNetwork.epochFetchCount)
+    assertEquals(2, mockNetwork.fetchCount)
+
+
+    //We should have truncated to the offsets in the second response
+    assertTrue("Expected " + t1p1 + " to truncate to offset 102 (truncation 
offsets: " + truncateToCapture.getValues + ")",
+               truncateToCapture.getValues.asScala.contains(102))
+    assertTrue("Expected " + t1p0 + " to truncate to offset 101 (truncation 
offsets: " + truncateToCapture.getValues + ")",
+               truncateToCapture.getValues.asScala.contains(101))
+  }
+
+  @Test
+  def shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20(): Unit = {
+
+    // Create a capture to track what partitions/offsets are truncated
+    val truncateToCapture: Capture[Long] = newCapture(CaptureType.ALL)
+
+    val props = TestUtils.createBrokerConfig(1, "localhost:1234")
+    props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.11.0")
+    val config = KafkaConfig.fromProps(props)
+
+    // Setup all dependencies
+    val quota = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val logManager = createMock(classOf[LogManager])
+    val replicaAlterLogDirsManager = 
createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica = createNiceMock(classOf[Replica])
+    val partition = createMock(classOf[Partition])
+    val replicaManager = createMock(classOf[ReplicaManager])
+
+    val initialLEO = 200
+
+    // Stubs
+    expect(partition.truncateTo(capture(truncateToCapture), 
anyBoolean())).anyTimes()
+    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+    expect(replica.logEndOffset).andReturn(new 
LogOffsetMetadata(initialLEO)).anyTimes()
+    expect(replica.highWatermark).andReturn(new LogOffsetMetadata(initialLEO - 
2)).anyTimes()
+    expect(leaderEpochs.latestEpoch).andReturn(5)
+    expect(leaderEpochs.endOffsetFor(4)).andReturn((3, 120)).anyTimes()
+    expect(leaderEpochs.endOffsetFor(3)).andReturn((3, 120)).anyTimes()
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
+    stub(replica, partition, replicaManager)
+
+    replay(leaderEpochs, replicaManager, logManager, quota, replica, partition)
+
+    // Define the offsets for the OffsetsForLeaderEpochResponse with undefined 
epoch to simulate
+    // older protocol version
+    val offsets = Map(t1p0 -> new 
EpochEndOffset(EpochEndOffset.UNDEFINED_EPOCH, 155), t1p1 -> new 
EpochEndOffset(EpochEndOffset.UNDEFINED_EPOCH, 143)).asJava
+
+    // Create the fetcher thread
+    val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, 
brokerEndPoint, new SystemTime())
+    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, 
replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+    thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0))
+
+    // Loop 1 -- both topic partitions will truncate to leader offset even 
though they don't know
+    // about leader epoch
+    thread.doWork()
+    assertEquals(1, mockNetwork.epochFetchCount)
+    assertEquals(1, mockNetwork.fetchCount)
+    assertEquals("OffsetsForLeaderEpochRequest version.",
+                 0, mockNetwork.lastUsedOffsetForLeaderEpochVersion)
+
+    //Loop 2 we should not fetch epochs
+    thread.doWork()
+    assertEquals(1, mockNetwork.epochFetchCount)
+    assertEquals(2, mockNetwork.fetchCount)
+
+    //We should have truncated to the offsets in the first response
+    assertTrue("Expected " + t1p0 + " to truncate to offset 155 (truncation 
offsets: " + truncateToCapture.getValues + ")",
+               truncateToCapture.getValues.asScala.contains(155))
+    assertTrue("Expected " + t1p1 + " to truncate to offset 143 (truncation 
offsets: " + truncateToCapture.getValues + ")",
+               truncateToCapture.getValues.asScala.contains(143))
   }
 
   @Test
@@ -227,6 +420,7 @@ class ReplicaFetcherThreadTest {
     expect(partition.truncateTo(capture(truncated), anyBoolean())).anyTimes()
     expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
     expect(replica.logEndOffset).andReturn(new 
LogOffsetMetadata(initialLeo)).anyTimes()
+    expect(replica.highWatermark).andReturn(new 
LogOffsetMetadata(initialFetchOffset)).anyTimes()
     expect(leaderEpochs.latestEpoch).andReturn(5)
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
@@ -234,18 +428,17 @@ class ReplicaFetcherThreadTest {
     replay(leaderEpochs, replicaManager, logManager, quota, replica, partition)
 
     //Define the offsets for the OffsetsForLeaderEpochResponse, these are used 
for truncation
-    val offsetsReply = Map(t1p0 -> new 
EpochEndOffset(EpochEndOffset.UNDEFINED_EPOCH_OFFSET)).asJava
+    val offsetsReply = Map(t1p0 -> new 
EpochEndOffset(EpochEndOffset.UNDEFINED_EPOCH, 
EpochEndOffset.UNDEFINED_EPOCH_OFFSET)).asJava
 
     //Create the thread
-    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
-    val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, 
endPoint, new SystemTime())
-    val thread = new ReplicaFetcherThread("bob", 0, endPoint, configs(0), 
replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+    val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, 
brokerEndPoint, new SystemTime())
+    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, 
configs(0), replicaManager, new Metrics(), new SystemTime(), quota, 
Some(mockNetwork))
     thread.addPartitions(Map(t1p0 -> initialFetchOffset))
 
     //Run it
     thread.doWork()
 
-    //We should have truncated to the highwatermark for partitino 2 only
+    //We should have truncated to initial fetch offset
     assertEquals(initialFetchOffset, truncated.getValue)
   }
 
@@ -265,6 +458,7 @@ class ReplicaFetcherThreadTest {
     val partition = createMock(classOf[Partition])
     val replicaManager = createMock(classOf[kafka.server.ReplicaManager])
 
+    val leaderEpoch = 5
     val highWaterMark = 100
     val initialLeo = 300
 
@@ -273,7 +467,9 @@ class ReplicaFetcherThreadTest {
     expect(partition.truncateTo(capture(truncated), anyBoolean())).anyTimes()
     expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
     expect(replica.logEndOffset).andReturn(new 
LogOffsetMetadata(initialLeo)).anyTimes()
-    expect(leaderEpochs.latestEpoch).andReturn(5)
+    expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch)
+    // this is for the last reply with EpochEndOffset(5, 156)
+    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, 
initialLeo)).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
     stub(replica, partition, replicaManager)
@@ -281,14 +477,13 @@ class ReplicaFetcherThreadTest {
 
     //Define the offsets for the OffsetsForLeaderEpochResponse, these are used 
for truncation
     val offsetsReply = mutable.Map(
-      t1p0 -> new EpochEndOffset(NOT_LEADER_FOR_PARTITION, 
EpochEndOffset.UNDEFINED_EPOCH_OFFSET),
-      t1p1 -> new EpochEndOffset(UNKNOWN_SERVER_ERROR, 
EpochEndOffset.UNDEFINED_EPOCH_OFFSET)
+      t1p0 -> new EpochEndOffset(NOT_LEADER_FOR_PARTITION, 
EpochEndOffset.UNDEFINED_EPOCH, EpochEndOffset.UNDEFINED_EPOCH_OFFSET),
+      t1p1 -> new EpochEndOffset(UNKNOWN_SERVER_ERROR, 
EpochEndOffset.UNDEFINED_EPOCH, EpochEndOffset.UNDEFINED_EPOCH_OFFSET)
     ).asJava
 
     //Create the thread
-    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
-    val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, 
endPoint, new SystemTime())
-    val thread = new ReplicaFetcherThread("bob", 0, endPoint, configs(0), 
replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+    val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, 
brokerEndPoint, new SystemTime())
+    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, 
configs(0), replicaManager, new Metrics(), new SystemTime(), quota, 
Some(mockNetwork))
     thread.addPartitions(Map(t1p0 -> 0, t2p1 -> 0))
 
     //Run thread 3 times
@@ -300,7 +495,7 @@ class ReplicaFetcherThreadTest {
     assertEquals(0, truncated.getValues.size())
 
     //New leader elected and replies
-    offsetsReply.put(t1p0, new EpochEndOffset(156))
+    offsetsReply.put(t1p0, new EpochEndOffset(leaderEpoch, 156))
 
     thread.doWork()
 
@@ -321,10 +516,14 @@ class ReplicaFetcherThreadTest {
     val partition = createMock(classOf[Partition])
     val replicaManager = createNiceMock(classOf[ReplicaManager])
 
+    val leaderEpoch = 4
+
     //Stub return values
     expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
     expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes()
-    expect(leaderEpochs.latestEpoch).andReturn(5)
+    expect(replica.highWatermark).andReturn(new 
LogOffsetMetadata(0)).anyTimes()
+    expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch)
+    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, 
0)).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
     stub(replica, partition, replicaManager)
@@ -333,13 +532,12 @@ class ReplicaFetcherThreadTest {
 
     //Define the offsets for the OffsetsForLeaderEpochResponse
     val offsetsReply = Map(
-      t1p0 -> new EpochEndOffset(1), t1p1 -> new EpochEndOffset(1)
+      t1p0 -> new EpochEndOffset(leaderEpoch, 1), t1p1 -> new 
EpochEndOffset(leaderEpoch, 1)
     ).asJava
 
     //Create the fetcher thread
-    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
-    val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, 
endPoint, new SystemTime())
-    val thread = new ReplicaFetcherThread("bob", 0, endPoint, config, 
replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+    val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, 
brokerEndPoint, new SystemTime())
+    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, 
replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
 
     //When
     thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0))
@@ -373,7 +571,9 @@ class ReplicaFetcherThreadTest {
     expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).once
     expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
     expect(replica.logEndOffset).andReturn(new 
LogOffsetMetadata(initialLEO)).anyTimes()
+    expect(replica.highWatermark).andReturn(new LogOffsetMetadata(initialLEO - 
2)).anyTimes()
     expect(leaderEpochs.latestEpoch).andReturn(5)
+    expect(leaderEpochs.endOffsetFor(5)).andReturn((5, initialLEO)).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
     stub(replica, partition, replicaManager)
@@ -382,13 +582,12 @@ class ReplicaFetcherThreadTest {
 
     //Define the offsets for the OffsetsForLeaderEpochResponse
     val offsetsReply = Map(
-      t1p0 -> new EpochEndOffset(52), t1p1 -> new EpochEndOffset(49)
+      t1p0 -> new EpochEndOffset(5, 52), t1p1 -> new EpochEndOffset(5, 49)
     ).asJava
 
     //Create the fetcher thread
-    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
-    val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, 
endPoint, new SystemTime())
-    val thread = new ReplicaFetcherThread("bob", 0, endPoint, config, 
replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+    val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, 
brokerEndPoint, new SystemTime())
+    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, 
replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
 
     //When
     thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0))
@@ -417,4 +616,4 @@ class ReplicaFetcherThreadTest {
     
expect(replicaManager.getReplicaOrException(t2p1)).andReturn(replica).anyTimes()
     
expect(replicaManager.getPartition(t2p1)).andReturn(Some(partition)).anyTimes()
   }
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 8a50fca..877b5c3 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -258,7 +258,7 @@ class RequestQuotaTest extends BaseRequestTest {
           new InitProducerIdRequest.Builder("abc")
 
         case ApiKeys.OFFSET_FOR_LEADER_EPOCH =>
-          new OffsetsForLeaderEpochRequest.Builder().add(tp, 0)
+          new 
OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion()).add(tp,
 0)
 
         case ApiKeys.ADD_PARTITIONS_TO_TXN =>
           new AddPartitionsToTxnRequest.Builder("test-transactional-id", 1, 0, 
List(tp).asJava)
diff --git 
a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
 
b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 6288d8f..05a6bb3 100644
--- 
a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -38,6 +38,7 @@ import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{ListBuffer => Buffer}
+import scala.collection.Seq
 
 /**
   * These tests were written to assert the addition of leader epochs to the 
replication protocol fix the problems
@@ -298,6 +299,86 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends 
ZooKeeperTestHarness
     }
   }
 
+  @Test
+  def logsShouldNotDivergeOnUncleanLeaderElections(): Unit = {
+
+    // Given two brokers, unclean leader election is enabled
+    brokers = (100 to 101).map(createBroker(_, enableUncleanLeaderElection = 
true))
+
+    // A single partition topic with 2 replicas, min.isr = 1
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(
+      topic, Map(0 -> Seq(100, 101)), config = 
CoreUtils.propsWith((KafkaConfig.MinInSyncReplicasProp, "1"))
+    )
+    producer = createNewProducer(getBrokerListStrFromServers(brokers), retries 
= 5, acks = 1)
+
+    // Write one message while both brokers are up
+    (0 until 1).foreach { i =>
+      producer.send(new ProducerRecord(topic, 0, null, msg))
+      producer.flush()}
+
+    // Since we use producer with acks = 1, make sure that logs match for the 
first epoch
+    waitForLogsToMatch(brokers(0), brokers(1))
+
+    // shutdown broker 100
+    brokers(0).shutdown()
+
+    //Write 1 message
+    (0 until 1).foreach { i =>
+      producer.send(new ProducerRecord(topic, 0, null, msg))
+      producer.flush()}
+
+    brokers(1).shutdown()
+    brokers(0).startup()
+
+    //Bounce the producer (this is required, probably because the broker port 
changes on restart?)
+    producer.close()
+    producer = createNewProducer(getBrokerListStrFromServers(brokers), retries 
= 5, acks = 1)
+
+    //Write 3 messages
+    (0 until 3).foreach { i =>
+      producer.send(new ProducerRecord(topic, 0, null, msgBigger))
+      producer.flush()}
+
+    brokers(0).shutdown()
+    brokers(1).startup()
+
+    //Bounce the producer (this is required, probably because the broker port 
changes on restart?)
+    producer.close()
+    producer = createNewProducer(getBrokerListStrFromServers(brokers), retries 
= 5, acks = 1)
+
+    //Write 1 message
+    (0 until 1).foreach { i =>
+      producer.send(new ProducerRecord(topic, 0, null, msg))
+      producer.flush()}
+
+    brokers(1).shutdown()
+    brokers(0).startup()
+
+    //Bounce the producer (this is required, probably because the broker port 
changes on restart?)
+    producer.close()
+    producer = createNewProducer(getBrokerListStrFromServers(brokers), retries 
= 5, acks = 1)
+
+    //Write 2 messages
+    (0 until 2).foreach { i =>
+      producer.send(new ProducerRecord(topic, 0, null, msgBigger))
+      producer.flush()}
+
+    printSegments()
+
+    brokers(1).startup()
+
+    waitForLogsToMatch(brokers(0), brokers(1))
+    printSegments()
+
+    def crcSeq(broker: KafkaServer, partition: Int = 0): Seq[Long] = {
+      val batches = getLog(broker, partition).activeSegment.read(0, None, 
Integer.MAX_VALUE)
+        .records.batches().asScala.toSeq
+      batches.map(_.checksum)
+    }
+    assertTrue(s"Logs on Broker 100 and Broker 101 should match",
+               crcSeq(brokers(0)) == crcSeq(brokers(1)))
+  }
+
   private def log(leader: KafkaServer, follower: KafkaServer): Unit = {
     info(s"Bounce complete for follower ${follower.config.brokerId}")
     info(s"Leader: leo${leader.config.brokerId}: " + getLog(leader, 
0).logEndOffset + " cache: " + epochCache(leader).epochEntries())
@@ -389,12 +470,13 @@ class EpochDrivenReplicationProtocolAcceptanceTest 
extends ZooKeeperTestHarness
     brokers.filter(_.config.brokerId != leader)(0)
   }
 
-  private def createBroker(id: Int): KafkaServer = {
+  private def createBroker(id: Int, enableUncleanLeaderElection: Boolean = 
false): KafkaServer = {
     val config = createBrokerConfig(id, zkConnect)
     if(!KIP_101_ENABLED) {
       config.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, 
KAFKA_0_11_0_IV1.version)
       config.setProperty(KafkaConfig.LogMessageFormatVersionProp, 
KAFKA_0_11_0_IV1.version)
     }
+    config.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, 
enableUncleanLeaderElection.toString)
     createServer(fromProps(config))
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
index 4a8df11..d1f9390 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
@@ -50,7 +50,7 @@ class LeaderEpochFileCacheTest {
     //Then
     assertEquals(2, cache.latestEpoch())
     assertEquals(EpochEntry(2, 10), cache.epochEntries()(0))
-    assertEquals(11, cache.endOffsetFor(2)) //should match leo
+    assertEquals((2, leo), cache.endOffsetFor(2)) //should match leo
   }
 
   @Test
@@ -67,23 +67,27 @@ class LeaderEpochFileCacheTest {
     leo = 14
 
     //Then
-    assertEquals(14, cache.endOffsetFor(2))
+    assertEquals((2, leo), cache.endOffsetFor(2))
   }
 
   @Test
   def shouldReturnUndefinedOffsetIfUndefinedEpochRequested() = {
     def leoFinder() = new LogOffsetMetadata(0)
+    val expectedEpochEndOffset = (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
 
     //Given cache with some data on leader
     val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+    // assign couple of epochs
     cache.assign(epoch = 2, offset = 11)
     cache.assign(epoch = 3, offset = 12)
 
     //When (say a bootstraping follower) sends request for UNDEFINED_EPOCH
-    val offsetFor = cache.endOffsetFor(UNDEFINED_EPOCH)
+    val epochAndOffsetFor = cache.endOffsetFor(UNDEFINED_EPOCH)
 
     //Then
-    assertEquals(UNDEFINED_EPOCH_OFFSET, offsetFor)
+    assertEquals("Expected undefined epoch and offset if undefined epoch 
requested. Cache not empty.",
+                 expectedEpochEndOffset, epochAndOffsetFor)
   }
 
   @Test
@@ -140,7 +144,7 @@ class LeaderEpochFileCacheTest {
     val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
 
     //Then
-    assertEquals(UNDEFINED_EPOCH_OFFSET, cache.endOffsetFor(0))
+    assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), 
cache.endOffsetFor(0))
   }
 
   @Test
@@ -155,7 +159,8 @@ class LeaderEpochFileCacheTest {
     val offsetFor = cache.endOffsetFor(UNDEFINED_EPOCH)
 
     //Then
-    assertEquals(UNDEFINED_EPOCH_OFFSET, offsetFor)
+    assertEquals("Expected undefined epoch and offset if undefined epoch 
requested. Empty cache.",
+                 (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), offsetFor)
   }
 
   @Test
@@ -170,10 +175,10 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 7, offset = 13)
 
     //When
-    val offset = cache.endOffsetFor(5 - 1)
+    val epochAndOffset = cache.endOffsetFor(5 - 1)
 
     //Then
-    assertEquals(UNDEFINED_EPOCH_OFFSET, offset)
+    assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), epochAndOffset)
   }
 
   @Test
@@ -194,7 +199,7 @@ class LeaderEpochFileCacheTest {
     leo = 17
 
     //Then get the start offset of the next epoch
-    assertEquals(15, cache.endOffsetFor(2))
+    assertEquals((2, 15), cache.endOffsetFor(2))
   }
 
   @Test
@@ -210,8 +215,9 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 4, offset = 17)
 
     //Then
-    assertEquals(13, cache.endOffsetFor(requestedEpoch = 1))
-    assertEquals(17, cache.endOffsetFor(requestedEpoch = 2))
+    assertEquals((0, 13), cache.endOffsetFor(requestedEpoch = 1))
+    assertEquals((2, 17), cache.endOffsetFor(requestedEpoch = 2))
+    assertEquals((2, 17), cache.endOffsetFor(requestedEpoch = 3))
   }
 
   @Test
@@ -242,7 +248,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 2, offset = 100)
 
     //Then
-    assertEquals(UNDEFINED_EPOCH_OFFSET, cache.endOffsetFor(3))
+    assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), 
cache.endOffsetFor(3))
   }
 
   @Test
@@ -258,7 +264,7 @@ class LeaderEpochFileCacheTest {
     leo = 7
 
     //Then
-    assertEquals(leo, cache.endOffsetFor(2))
+    assertEquals((2, leo), cache.endOffsetFor(2))
     assertEquals(1, cache.epochEntries.size)
     assertEquals(EpochEntry(2, 6), cache.epochEntries()(0))
   }
@@ -300,10 +306,10 @@ class LeaderEpochFileCacheTest {
     assertEquals(2, cache.latestEpoch())
 
     //Then end offset for epoch 1 shouldn't have changed
-    assertEquals(6, cache.endOffsetFor(1))
+    assertEquals((1, 6), cache.endOffsetFor(1))
 
-    //Then end offset for epoch 2 has to be the offset of the epoch 1 message 
(I can't thing of a better option)
-    assertEquals(8, cache.endOffsetFor(2))
+    //Then end offset for epoch 2 has to be the offset of the epoch 1 message 
(I can't think of a better option)
+    assertEquals((2, 8), cache.endOffsetFor(2))
 
     //Epoch history shouldn't have changed
     assertEquals(EpochEntry(1, 5), cache.epochEntries()(0))
@@ -340,17 +346,17 @@ class LeaderEpochFileCacheTest {
     //Then epoch should go up
     assertEquals(1, cache.latestEpoch())
     //offset for 1 should still be 0
-    assertEquals(0, cache.endOffsetFor(1))
+    assertEquals((1, 0), cache.endOffsetFor(1))
     //offset for epoch 0 should still be 0
-    assertEquals(0, cache.endOffsetFor(0))
+    assertEquals((0, 0), cache.endOffsetFor(0))
 
     //When we write 5 messages as epoch 1
     leo = 5
 
     //Then end offset for epoch(1) should be leo => 5
-    assertEquals(5, cache.endOffsetFor(1))
+    assertEquals((1, 5), cache.endOffsetFor(1))
     //Epoch 0 should still be at offset 0
-    assertEquals(0, cache.endOffsetFor(0))
+    assertEquals((0, 0), cache.endOffsetFor(0))
 
     //When
     cache.assign(epoch = 2, offset = 5) //leo=5
@@ -358,13 +364,13 @@ class LeaderEpochFileCacheTest {
     leo = 10 //write another 5 messages
 
     //Then end offset for epoch(2) should be leo => 10
-    assertEquals(10, cache.endOffsetFor(2))
+    assertEquals((2, 10), cache.endOffsetFor(2))
 
     //end offset for epoch(1) should be the start offset of epoch(2) => 5
-    assertEquals(5, cache.endOffsetFor(1))
+    assertEquals((1, 5), cache.endOffsetFor(1))
 
     //epoch (0) should still be 0
-    assertEquals(0, cache.endOffsetFor(0))
+    assertEquals((0, 0), cache.endOffsetFor(0))
   }
 
   @Test
@@ -382,7 +388,7 @@ class LeaderEpochFileCacheTest {
 
     //Then epoch should stay, offsets should grow
     assertEquals(0, cache.latestEpoch())
-    assertEquals(leo, cache.endOffsetFor(0))
+    assertEquals((0, leo), cache.endOffsetFor(0))
 
     //When messages arrive with greater epoch
     cache.assign(epoch = 1, offset = 3); leo = 4
@@ -390,7 +396,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 1, offset = 5); leo = 6
 
     assertEquals(1, cache.latestEpoch())
-    assertEquals(leo, cache.endOffsetFor(1))
+    assertEquals((1, leo), cache.endOffsetFor(1))
 
     //When
     cache.assign(epoch = 2, offset = 6); leo = 7
@@ -398,11 +404,11 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 2, offset = 8); leo = 9
 
     assertEquals(2, cache.latestEpoch())
-    assertEquals(leo, cache.endOffsetFor(2))
+    assertEquals((2, leo), cache.endOffsetFor(2))
 
     //Older epochs should return the start offset of the first message in the 
subsequent epoch.
-    assertEquals(3, cache.endOffsetFor(0))
-    assertEquals(6, cache.endOffsetFor(1))
+    assertEquals((0, 3), cache.endOffsetFor(0))
+    assertEquals((1, 6), cache.endOffsetFor(1))
   }
 
   @Test
@@ -648,7 +654,7 @@ class LeaderEpochFileCacheTest {
     val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
 
     //Then
-    assertEquals(-1, cache.endOffsetFor(7))
+    assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), 
cache.endOffsetFor(7))
   }
 
   @Test
diff --git 
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index dc6ff9e..907db7a 100644
--- 
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.serialization.StringSerializer
 import org.apache.kafka.common.utils.{LogContext, SystemTime}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.ApiKeys
 
 import org.junit.Assert._
 import org.junit.{After, Test}
@@ -265,7 +266,7 @@ class LeaderEpochIntegrationTest extends 
ZooKeeperTestHarness with Logging {
   private[epoch] class TestFetcherThread(sender: BlockingSend) extends Logging 
{
 
     def leaderOffsetsFor(partitions: Map[TopicPartition, Int]): 
Map[TopicPartition, EpochEndOffset] = {
-      val request = new 
OffsetsForLeaderEpochRequest.Builder(toJavaFormat(partitions))
+      val request = new 
OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(),
 toJavaFormat(partitions))
       val response = sender.sendRequest(request)
       
response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala
     }
diff --git 
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala 
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 1c01d62..5c60c00 100644
--- 
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -41,7 +41,7 @@ class OffsetsForLeaderEpochTest {
   @Test
   def shouldGetEpochsFromReplica(): Unit = {
     //Given
-    val offset = 42
+    val epochAndOffset = (5, 42L)
     val epochRequested: Integer = 5
     val request = Map(tp -> epochRequested)
 
@@ -49,7 +49,7 @@ class OffsetsForLeaderEpochTest {
     val mockLog = createNiceMock(classOf[kafka.log.Log])
     val mockCache = 
createNiceMock(classOf[kafka.server.epoch.LeaderEpochCache])
     val logManager = createNiceMock(classOf[kafka.log.LogManager])
-    expect(mockCache.endOffsetFor(epochRequested)).andReturn(offset)
+    expect(mockCache.endOffsetFor(epochRequested)).andReturn(epochAndOffset)
     expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes()
     expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
     replay(mockCache, mockLog, logManager)
@@ -67,7 +67,7 @@ class OffsetsForLeaderEpochTest {
     val response = replicaManager.lastOffsetForLeaderEpoch(request)
 
     //Then
-    assertEquals(new EpochEndOffset(Errors.NONE, offset), response(tp))
+    assertEquals(new EpochEndOffset(Errors.NONE, epochAndOffset._1, 
epochAndOffset._2), response(tp))
   }
 
   @Test
@@ -90,7 +90,7 @@ class OffsetsForLeaderEpochTest {
     val response = replicaManager.lastOffsetForLeaderEpoch(request)
 
     //Then
-    assertEquals(new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, 
UNDEFINED_EPOCH_OFFSET), response(tp))
+    assertEquals(new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, 
UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), response(tp))
   }
 
   @Test
@@ -112,6 +112,6 @@ class OffsetsForLeaderEpochTest {
     val response = replicaManager.lastOffsetForLeaderEpoch(request)
 
     //Then
-    assertEquals(new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
UNDEFINED_EPOCH_OFFSET), response(tp))
+    assertEquals(new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), response(tp))
   }
-}
\ No newline at end of file
+}
diff --git 
a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
 
b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
index 1f5bec1..50a4d74 100644
--- 
a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
+++ 
b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
@@ -28,17 +28,28 @@ import org.apache.kafka.common.utils.{SystemTime, Time}
 
 /**
   * Stub network client used for testing the ReplicaFetcher, wraps the 
MockClient used for consumer testing
+  *
+  * The common case is that there is only one OFFSET_FOR_LEADER_EPOCH 
request/response. So, the
+  * response to OFFSET_FOR_LEADER_EPOCH is 'offsets' map. If the test needs to 
set another round of
+  * OFFSET_FOR_LEADER_EPOCH with different offsets in response, it should 
update offsets using
+  * setOffsetsForNextResponse
   */
 class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, 
EpochEndOffset], destination: BrokerEndPoint, time: Time) extends BlockingSend {
   private val client = new MockClient(new SystemTime)
   var fetchCount = 0
   var epochFetchCount = 0
+  var lastUsedOffsetForLeaderEpochVersion = -1
   var callback: Option[() => Unit] = None
+  var currentOffsets: java.util.Map[TopicPartition, EpochEndOffset] = offsets
 
   def setEpochRequestCallback(postEpochFunction: () => Unit){
     callback = Some(postEpochFunction)
   }
 
+  def setOffsetsForNextResponse(newOffsets: java.util.Map[TopicPartition, 
EpochEndOffset]): Unit = {
+    currentOffsets = newOffsets
+  }
+
   override def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): 
ClientResponse = {
 
     //Send the request to the mock client
@@ -50,7 +61,8 @@ class ReplicaFetcherMockBlockingSend(offsets: 
java.util.Map[TopicPartition, Epoc
       case ApiKeys.OFFSET_FOR_LEADER_EPOCH =>
         callback.foreach(_.apply())
         epochFetchCount += 1
-        new OffsetsForLeaderEpochResponse(offsets)
+        lastUsedOffsetForLeaderEpochVersion = 
requestBuilder.latestAllowedVersion()
+        new OffsetsForLeaderEpochResponse(currentOffsets)
 
       case ApiKeys.FETCH =>
         fetchCount += 1
@@ -75,4 +87,4 @@ class ReplicaFetcherMockBlockingSend(offsets: 
java.util.Map[TopicPartition, Epoc
   }
 
   override def close(): Unit = {}
-}
\ No newline at end of file
+}
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 451f103..f15191d 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -19,6 +19,58 @@
 
 <script id="upgrade-template" type="text/x-handlebars-template">
 
+<h4><a id="upgrade_2_0_0" href="#upgrade_2_0_0">Upgrading from 0.8.x, 0.9.x, 
0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, or 1.2.x to 2.0.0</a></h4>
+<p>Kafka 2.0.0 introduces wire protocol changes. By following the recommended 
rolling upgrade plan below,
+    you guarantee no downtime during the upgrade. However, please review the 
<a href="#upgrade_200_notable">notable changes in 2.0.0</a> before upgrading.
+</p>
+
+<p><b>For a rolling upgrade:</b></p>
+
+<ol>
+    <li> Update server.properties on all brokers and add the following 
properties. CURRENT_KAFKA_VERSION refers to the version you
+        are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the 
message format version currently in use. If you have previously
+        overridden the message format version, you should keep its current 
value. Alternatively, if you are upgrading from a version prior
+        to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to 
match CURRENT_KAFKA_VERSION.
+        <ul>
+            <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 
0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1, 1.2).</li>
+            <li>log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION  
(See <a href="#upgrade_10_performance_impact">potential performance impact
+                following the upgrade</a> for the details on what this 
configuration does.)</li>
+        </ul>
+        If you are upgrading from 0.11.0.x, 1.0.x, 1.1.x or 1.2.x and you have 
not overridden the message format, then you only need to override
+        the inter-broker protocol format.
+        <ul>
+            <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 
1.0, 1.1, 1.2).</li>
+        </ul>
+    </li>
+    <li> Upgrade the brokers one at a time: shut down the broker, update the 
code, and restart it. </li>
+    <li> Once the entire cluster is upgraded, bump the protocol version by 
editing <code>inter.broker.protocol.version</code> and setting it to 2.0.
+    <li> Restart the brokers one by one for the new protocol version to take 
effect.</li>
+    <li> If you have overridden the message format version as instructed 
above, then you need to do one more rolling restart to
+        upgrade it to its latest version. Once all (or most) consumers have 
been upgraded to 0.11.0 or later,
+        change log.message.format.version to 2.0 on each broker and restart 
them one by one. Note that the older Scala consumer
+        does not support the new message format introduced in 0.11, so to 
avoid the performance cost of down-conversion (or to
+        take advantage of <a href="#upgrade_11_exactly_once_semantics">exactly 
once semantics</a>), the newer Java consumer must be used.</li>
+</ol>
+
+<p><b>Additional Upgrade Notes:</b></p>
+
+<ol>
+    <li>If you are willing to accept downtime, you can simply take all the 
brokers down, update the code and start them back up. They will start
+        with the new protocol by default.</li>
+    <li>Bumping the protocol version and restarting can be done any time after 
the brokers are upgraded. It does not have to be immediately after.
+        Similarly for the message format version.</li>
+    <li>If you are using Java8 method references in your Kafka Streams code 
you might need to update your code to resolve method ambiguties.
+        Hot-swaping the jar-file only might not work.</li>
+</ol>
+
+<h5><a id="upgrade_200_notable" href="#upgrade_200_notable">Notable changes in 
2.0.0</a></h5>
+<ul>
+</ul>
+
+<h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New 
Protocol Versions</a></h5>
+<ul>
+    <li> <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-279%3A+Fix+log+divergence+between+leader+and+follower+after+fast+leader+fail+over";>KIP-279</a>:
 OffsetsForLeaderEpochResponse v1 introduces a partition-level 
<code>leader_epoch</code> field. </li>
+</ul>
 
 <h4><a id="upgrade_2_0_0" href="#upgrade_2_0_0">Upgrading from 0.8.x, 0.9.x, 
0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, or 1.1.x to 2.0.x</a></h4>
 <p>Kafka 2.0.0 introduces wire protocol changes. By following the recommended 
rolling upgrade plan below,

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to