This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0984a76 MINOR: Move common out of range handling into
AbstractFetcherThread (#5608)
0984a76 is described below
commit 0984a76b712caa18c688eafbacaa2a7c889d27b2
Author: Jason Gustafson <[email protected]>
AuthorDate: Sat Sep 8 17:00:13 2018 -0700
MINOR: Move common out of range handling into AbstractFetcherThread (#5608)
This patch removes the duplication of the out of range handling between
`ReplicaFetcherThread` and `ReplicaAlterLogDirsThread` and attempts to expose a
cleaner API for extension. It also adds a mock implementation to facilitate
testing and several new test cases.
Reviewers: Jun Rao <[email protected]>
---
.../apache/kafka/common/requests/FetchRequest.java | 2 +-
core/src/main/scala/kafka/cluster/Partition.scala | 20 +-
core/src/main/scala/kafka/cluster/Replica.scala | 5 +-
.../scala/kafka/server/AbstractFetcherThread.scala | 231 +++++++----
.../kafka/server/ReplicaAlterLogDirsThread.scala | 79 ++--
.../scala/kafka/server/ReplicaFetcherThread.scala | 140 +++----
.../ReplicaFetcherThreadFatalErrorTest.scala | 2 +-
.../kafka/server/AbstractFetcherThreadTest.scala | 454 ++++++++++++++++-----
8 files changed, 615 insertions(+), 318 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 6e25f7c..e013f5e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -267,7 +267,7 @@ public class FetchRequest extends AbstractRequest {
private IsolationLevel isolationLevel =
IsolationLevel.READ_UNCOMMITTED;
private int maxBytes = DEFAULT_RESPONSE_MAX_BYTES;
private FetchMetadata metadata = FetchMetadata.LEGACY;
- private List<TopicPartition> toForget =
Collections.<TopicPartition>emptyList();
+ private List<TopicPartition> toForget = Collections.emptyList();
public static Builder forConsumer(int maxWait, int minBytes,
Map<TopicPartition, PartitionData> fetchData) {
return new Builder(ApiKeys.FETCH.oldestVersion(),
ApiKeys.FETCH.latestVersion(),
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index d76d6d0..2036bb0 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -16,7 +16,6 @@
*/
package kafka.cluster
-
import java.util.concurrent.locks.ReentrantReadWriteLock
import com.yammer.metrics.core.Gauge
@@ -591,26 +590,25 @@ class Partition(val topic: String,
laggingReplicas
}
- private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords,
isFuture: Boolean): Unit = {
+ private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords,
isFuture: Boolean): Option[LogAppendInfo] = {
+ // The read lock is needed to handle race condition if request handler
thread tries to
+ // remove future replica after receiving AlterReplicaLogDirsRequest.
inReadLock(leaderIsrUpdateLock) {
if (isFuture) {
- // The read lock is needed to handle race condition if request handler
thread tries to
- // remove future replica after receiving AlterReplicaLogDirsRequest.
- inReadLock(leaderIsrUpdateLock) {
- getReplica(Request.FutureLocalReplicaId) match {
- case Some(replica) => replica.log.get.appendAsFollower(records)
- case None => // Future replica is removed by a
non-ReplicaAlterLogDirsThread before this method is called
- }
+ // Note the replica may be undefined if it is removed by a
non-ReplicaAlterLogDirsThread before
+ // this method is called
+ getReplica(Request.FutureLocalReplicaId).map { replica =>
+ replica.log.get.appendAsFollower(records)
}
} else {
// The read lock is needed to prevent the follower replica from being
updated while ReplicaAlterDirThread
// is executing maybeDeleteAndSwapFutureReplica() to replace follower
replica with the future replica.
- getReplicaOrException().log.get.appendAsFollower(records)
+ Some(getReplicaOrException().log.get.appendAsFollower(records))
}
}
}
- def appendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture:
Boolean) {
+ def appendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture:
Boolean): Option[LogAppendInfo] = {
try {
doAppendRecordsToFollowerOrFutureReplica(records, isFuture)
} catch {
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala
b/core/src/main/scala/kafka/cluster/Replica.scala
index 962aaff..839579b 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -18,6 +18,7 @@
package kafka.cluster
import kafka.log.Log
+import kafka.server.epoch.LeaderEpochCache
import kafka.utils.Logging
import kafka.server.{LogOffsetMetadata, LogReadResult}
import org.apache.kafka.common.{KafkaException, TopicPartition}
@@ -52,9 +53,9 @@ class Replica(val brokerId: Int,
def isLocal: Boolean = log.isDefined
- def lastCaughtUpTimeMs = _lastCaughtUpTimeMs
+ def lastCaughtUpTimeMs: Long = _lastCaughtUpTimeMs
- val epochs = log.map(_.leaderEpochCache)
+ val epochs: Option[LeaderEpochCache] = log.map(_.leaderEpochCache)
info(s"Replica loaded for partition $topicPartition with initial high
watermark $initialHighWatermarkValue")
log.foreach(_.onHighWatermarkIncremented(initialHighWatermarkValue))
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index e753f6e..44137cf 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -20,7 +20,7 @@ package kafka.server
import java.nio.ByteBuffer
import java.util.concurrent.locks.ReentrantLock
-import kafka.cluster.{BrokerEndPoint, Replica}
+import kafka.cluster.BrokerEndPoint
import kafka.utils.{DelayedItem, Pool, ShutdownableThread}
import org.apache.kafka.common.errors.{CorruptRecordException,
KafkaStorageException}
import org.apache.kafka.common.requests.EpochEndOffset._
@@ -36,10 +36,11 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import com.yammer.metrics.core.Gauge
+import kafka.log.LogAppendInfo
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.internals.{FatalExitError, PartitionStates}
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
-import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest,
FetchResponse}
+import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest,
FetchResponse, ListOffsetRequest}
import scala.math._
@@ -50,8 +51,7 @@ abstract class AbstractFetcherThread(name: String,
clientId: String,
val sourceBroker: BrokerEndPoint,
fetchBackOffMs: Int = 0,
- isInterruptible: Boolean = true,
- includeLogTruncation: Boolean)
+ isInterruptible: Boolean = true)
extends ShutdownableThread(name, isInterruptible) {
type PD = FetchResponse.PartitionData[Records]
@@ -67,21 +67,31 @@ abstract class AbstractFetcherThread(name: String,
/* callbacks to be defined in subclass */
// process fetched data
- protected def processPartitionData(topicPartition: TopicPartition,
fetchOffset: Long, partitionData: PD,
- records: MemoryRecords)
+ protected def processPartitionData(topicPartition: TopicPartition,
+ fetchOffset: Long,
+ partitionData: PD): Option[LogAppendInfo]
- // handle a partition whose offset is out of range and return a new fetch
offset
- protected def handleOffsetOutOfRange(topicPartition: TopicPartition): Long
+ protected def truncate(topicPartition: TopicPartition, truncationState:
OffsetTruncationState): Unit
- protected def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]):
Map[TopicPartition, EpochEndOffset]
-
- protected def truncate(topicPartition: TopicPartition, epochEndOffset:
EpochEndOffset): OffsetTruncationState
+ protected def truncateFullyAndStartAt(topicPartition: TopicPartition,
offset: Long): Unit
protected def buildFetch(partitionMap: Map[TopicPartition,
PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]]
- protected def fetch(fetchRequest: FetchRequest.Builder):
Seq[(TopicPartition, PD)]
+ protected def isUncleanLeaderElectionAllowed(topicPartition:
TopicPartition): Boolean
+
+ protected def latestEpoch(topicPartition: TopicPartition): Option[Int]
+
+ protected def logEndOffset(topicPartition: TopicPartition): Long
+
+ protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int):
Option[OffsetAndEpoch]
+
+ protected def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]):
Map[TopicPartition, EpochEndOffset]
+
+ protected def fetchFromLeader(fetchRequest: FetchRequest.Builder):
Seq[(TopicPartition, PD)]
+
+ protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition):
Long
- protected def getReplica(tp: TopicPartition): Option[Replica]
+ protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition):
Long
override def shutdown() {
initiateShutdown()
@@ -97,7 +107,10 @@ abstract class AbstractFetcherThread(name: String,
override def doWork() {
maybeTruncate()
+ maybeFetch()
+ }
+ private def maybeFetch(): Unit = {
val (fetchStates, fetchRequestOpt) = inLock(partitionMapLock) {
val fetchStates = partitionStates.partitionStateMap.asScala
val ResultWithPartitions(fetchRequestOpt, partitionsWithError) =
buildFetch(fetchStates)
@@ -134,7 +147,7 @@ abstract class AbstractFetcherThread(name: String,
partitionStates.partitionStates.asScala.foreach { state =>
val tp = state.topicPartition
if (state.value.isTruncatingLog) {
- getReplica(tp).flatMap(_.epochs).map(_.latestEpoch) match {
+ latestEpoch(tp) match {
case Some(latestEpoch) => partitionsWithEpochs += tp -> latestEpoch
case None => partitionsWithoutEpochs += tp
}
@@ -194,6 +207,12 @@ abstract class AbstractFetcherThread(name: String,
ResultWithPartitions(fetchOffsets, partitionsWithError)
}
+ private def truncate(topicPartition: TopicPartition, epochEndOffset:
EpochEndOffset): OffsetTruncationState = {
+ val offsetTruncationState = getOffsetTruncationState(topicPartition,
epochEndOffset)
+ truncate(topicPartition, offsetTruncationState)
+ offsetTruncationState
+ }
+
private def processFetchRequest(fetchStates: Map[TopicPartition,
PartitionFetchState],
fetchRequest: FetchRequest.Builder): Unit = {
val partitionsWithError = mutable.Set[TopicPartition]()
@@ -201,7 +220,7 @@ abstract class AbstractFetcherThread(name: String,
try {
trace(s"Sending fetch request $fetchRequest")
- responseData = fetch(fetchRequest)
+ responseData = fetchFromLeader(fetchRequest)
} catch {
case t: Throwable =>
if (isRunning) {
@@ -220,7 +239,6 @@ abstract class AbstractFetcherThread(name: String,
if (responseData.nonEmpty) {
// process fetched data
inLock(partitionMapLock) {
-
responseData.foreach { case (topicPartition, partitionData) =>
Option(partitionStates.stateValue(topicPartition)).foreach {
currentPartitionFetchState =>
// It's possible that a partition is removed and re-added or
truncated when there is a pending fetch request.
@@ -231,28 +249,31 @@ abstract class AbstractFetcherThread(name: String,
partitionData.error match {
case Errors.NONE =>
try {
- val records = toMemoryRecords(partitionData.records)
- val newOffset =
records.batches.asScala.lastOption.map(_.nextOffset).getOrElse(
- currentPartitionFetchState.fetchOffset)
-
- fetcherLagStats.getAndMaybePut(topicPartition).lag =
Math.max(0L, partitionData.highWatermark - newOffset)
// Once we hand off the partition data to the subclass, we
can't mess with it any more in this thread
- processPartitionData(topicPartition,
currentPartitionFetchState.fetchOffset, partitionData, records)
-
- val validBytes = records.validBytes
- // ReplicaDirAlterThread may have removed topicPartition
from the partitionStates after processing the partition data
- if (validBytes > 0 &&
partitionStates.contains(topicPartition)) {
- // Update partitionStates only if there is no exception
during processPartitionData
- partitionStates.updateAndMoveToEnd(topicPartition, new
PartitionFetchState(newOffset))
- fetcherStats.byteRate.mark(validBytes)
+ val logAppendInfoOpt =
processPartitionData(topicPartition, currentPartitionFetchState.fetchOffset,
+ partitionData)
+
+ logAppendInfoOpt.foreach { logAppendInfo =>
+ val nextOffset = logAppendInfo.lastOffset + 1
+ fetcherLagStats.getAndMaybePut(topicPartition).lag =
Math.max(0L, partitionData.highWatermark - nextOffset)
+
+ val validBytes = logAppendInfo.validBytes
+ // ReplicaDirAlterThread may have removed topicPartition
from the partitionStates after processing the partition data
+ if (validBytes > 0 &&
partitionStates.contains(topicPartition)) {
+ // Update partitionStates only if there is no
exception during processPartitionData
+ partitionStates.updateAndMoveToEnd(topicPartition, new
PartitionFetchState(nextOffset))
+ fetcherStats.byteRate.mark(validBytes)
+ }
}
} catch {
case ime: CorruptRecordException =>
// we log the error and continue. This ensures two things
- // 1. If there is a corrupt message in a topic
partition, it does not bring the fetcher thread down and cause other topic
partition to also lag
- // 2. If the message is corrupt due to a transient state
in the log (truncation, partial writes can cause this), we simply continue and
- // should get fixed in the subsequent fetches
- error(s"Found invalid messages during fetch for
partition $topicPartition offset ${currentPartitionFetchState.fetchOffset}",
ime)
+ // 1. If there is a corrupt message in a topic
partition, it does not bring the fetcher thread
+ // down and cause other topic partition to also lag
+ // 2. If the message is corrupt due to a transient state
in the log (truncation, partial writes
+ // can cause this), we simply continue and should get
fixed in the subsequent fetches
+ error(s"Found invalid messages during fetch for
partition $topicPartition " +
+ s"offset ${currentPartitionFetchState.fetchOffset}",
ime)
partitionsWithError += topicPartition
case e: KafkaStorageException =>
error(s"Error while processing data for partition
$topicPartition", e)
@@ -297,8 +318,6 @@ abstract class AbstractFetcherThread(name: String,
}
def markPartitionsForTruncation(topicPartition: TopicPartition,
truncationOffset: Long) {
- if (!includeLogTruncation)
- throw new IllegalStateException("Truncation should not be requested if
includeLogTruncation is disabled")
partitionMapLock.lockInterruptibly()
try {
Option(partitionStates.stateValue(topicPartition)).foreach { state =>
@@ -318,9 +337,9 @@ abstract class AbstractFetcherThread(name: String,
}.map { case (tp, initialFetchOffset) =>
val fetchState =
if (initialFetchOffset < 0)
- new PartitionFetchState(handleOffsetOutOfRange(tp),
includeLogTruncation)
+ new PartitionFetchState(handleOffsetOutOfRange(tp), truncatingLog
= true)
else
- new PartitionFetchState(initialFetchOffset, includeLogTruncation)
+ new PartitionFetchState(initialFetchOffset, truncatingLog = true)
tp -> fetchState
}
@@ -341,11 +360,11 @@ abstract class AbstractFetcherThread(name: String,
private def updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets:
Map[TopicPartition, OffsetTruncationState]) {
val newStates: Map[TopicPartition, PartitionFetchState] =
partitionStates.partitionStates.asScala
.map { state =>
- val maybeTruncationComplete = fetchOffsets.get(state.topicPartition())
match {
+ val maybeTruncationComplete = fetchOffsets.get(state.topicPartition)
match {
case Some(offsetTruncationState) =>
PartitionFetchState(offsetTruncationState.offset, state.value.delay,
truncatingLog = !offsetTruncationState.truncationCompleted)
case None => state.value()
}
- (state.topicPartition(), maybeTruncationComplete)
+ (state.topicPartition, maybeTruncationComplete)
}.toMap
partitionStates.set(newStates.asJava)
}
@@ -372,57 +391,121 @@ abstract class AbstractFetcherThread(name: String,
*
* @param tp Topic partition
* @param leaderEpochOffset Epoch end offset received from the leader
for this topic partition
- * @param replica Follower's replica, which is either local
replica
- * (ReplicaFetcherThread) or future replica
(ReplicaAlterLogDirsThread)
- * @param isFutureReplica true if called from ReplicaAlterLogDirsThread
*/
- def getOffsetTruncationState(tp: TopicPartition, leaderEpochOffset:
EpochEndOffset, replica: Replica, isFutureReplica: Boolean = false):
OffsetTruncationState = {
- // to make sure we can distinguish log output for fetching from remote
leader or local replica
- val followerName = if (isFutureReplica) "future replica" else "follower"
-
+ private def getOffsetTruncationState(tp: TopicPartition, leaderEpochOffset:
EpochEndOffset): OffsetTruncationState = {
if (leaderEpochOffset.endOffset == UNDEFINED_EPOCH_OFFSET) {
// truncate to initial offset which is the high watermark for follower
replica. For
// future replica, it is either high watermark of the future replica or
current
// replica's truncation offset (when the current replica truncates, it
forces future
// replica's partition state to 'truncating' and sets initial offset to
its truncation offset)
- warn(s"Based on $followerName's leader epoch, leader replied with an
unknown offset in ${replica.topicPartition}. " +
+ warn(s"Based on replica's leader epoch, leader replied with an unknown
offset in $tp. " +
s"The initial fetch offset
${partitionStates.stateValue(tp).fetchOffset} will be used for truncation.")
OffsetTruncationState(partitionStates.stateValue(tp).fetchOffset,
truncationCompleted = true)
} else if (leaderEpochOffset.leaderEpoch == UNDEFINED_EPOCH) {
// either leader or follower or both use inter-broker protocol version <
KAFKA_2_0_IV0
// (version 0 of OffsetForLeaderEpoch request/response)
- warn(s"Leader or $followerName is on protocol version where leader epoch
is not considered in the OffsetsForLeaderEpoch response. " +
- s"The leader's offset ${leaderEpochOffset.endOffset} will be used
for truncation in ${replica.topicPartition}.")
- OffsetTruncationState(min(leaderEpochOffset.endOffset,
replica.logEndOffset.messageOffset), truncationCompleted = true)
+ warn(s"Leader or replica is on protocol version where leader epoch is
not considered in the OffsetsForLeaderEpoch response. " +
+ s"The leader's offset ${leaderEpochOffset.endOffset} will be used
for truncation in $tp.")
+ OffsetTruncationState(min(leaderEpochOffset.endOffset,
logEndOffset(tp)), truncationCompleted = true)
} else {
+ val replicaEndOffset = logEndOffset(tp)
+
// get (leader epoch, end offset) pair that corresponds to the largest
leader epoch
// less than or equal to the requested epoch.
- val (followerEpoch, followerEndOffset) =
replica.epochs.get.endOffsetFor(leaderEpochOffset.leaderEpoch)
- if (followerEndOffset == UNDEFINED_EPOCH_OFFSET) {
- // This can happen if the follower was not tracking leader epochs at
that point (before the
- // upgrade, or if this broker is new). Since the leader replied with
epoch <
- // requested epoch from follower, so should be safe to truncate to
leader's
- // offset (this is the same behavior as post-KIP-101 and pre-KIP-279)
- warn(s"Based on $followerName's leader epoch, leader replied with
epoch ${leaderEpochOffset.leaderEpoch} " +
- s"below any $followerName's tracked epochs for
${replica.topicPartition}. " +
- s"The leader's offset only ${leaderEpochOffset.endOffset} will be
used for truncation.")
- OffsetTruncationState(min(leaderEpochOffset.endOffset,
replica.logEndOffset.messageOffset), truncationCompleted = true)
- } else if (followerEpoch != leaderEpochOffset.leaderEpoch) {
- // the follower does not know about the epoch that leader replied with
- // we truncate to the end offset of the largest epoch that is smaller
than the
- // epoch the leader replied with, and send another offset for leader
epoch request
- val intermediateOffsetToTruncateTo = min(followerEndOffset,
replica.logEndOffset.messageOffset)
- info(s"Based on $followerName's leader epoch, leader replied with
epoch ${leaderEpochOffset.leaderEpoch} " +
- s"unknown to the $followerName for ${replica.topicPartition}. " +
- s"Will truncate to $intermediateOffsetToTruncateTo and send
another leader epoch request to the leader.")
- OffsetTruncationState(intermediateOffsetToTruncateTo,
truncationCompleted = false)
- } else {
- val offsetToTruncateTo = min(followerEndOffset,
leaderEpochOffset.endOffset)
- OffsetTruncationState(min(offsetToTruncateTo,
replica.logEndOffset.messageOffset), truncationCompleted = true)
+ endOffsetForEpoch(tp, leaderEpochOffset.leaderEpoch) match {
+ case Some(OffsetAndEpoch(followerEndOffset, followerEpoch)) =>
+ if (followerEpoch != leaderEpochOffset.leaderEpoch) {
+ // the follower does not know about the epoch that leader replied
with
+ // we truncate to the end offset of the largest epoch that is
smaller than the
+ // epoch the leader replied with, and send another offset for
leader epoch request
+ val intermediateOffsetToTruncateTo = min(followerEndOffset,
replicaEndOffset)
+ info(s"Based on replica's leader epoch, leader replied with epoch
${leaderEpochOffset.leaderEpoch} " +
+ s"unknown to the replica for $tp. " +
+ s"Will truncate to $intermediateOffsetToTruncateTo and send
another leader epoch request to the leader.")
+ OffsetTruncationState(intermediateOffsetToTruncateTo,
truncationCompleted = false)
+ } else {
+ val offsetToTruncateTo = min(followerEndOffset,
leaderEpochOffset.endOffset)
+ OffsetTruncationState(min(offsetToTruncateTo, replicaEndOffset),
truncationCompleted = true)
+ }
+ case None =>
+ // This can happen if the follower was not tracking leader epochs at
that point (before the
+ // upgrade, or if this broker is new). Since the leader replied with
epoch <
+ // requested epoch from follower, so should be safe to truncate to
leader's
+ // offset (this is the same behavior as post-KIP-101 and pre-KIP-279)
+ warn(s"Based on replica's leader epoch, leader replied with epoch
${leaderEpochOffset.leaderEpoch} " +
+ s"below any replica's tracked epochs for $tp. " +
+ s"The leader's offset only ${leaderEpochOffset.endOffset} will be
used for truncation.")
+ OffsetTruncationState(min(leaderEpochOffset.endOffset,
replicaEndOffset), truncationCompleted = true)
}
}
}
+ /**
+ * Handle a partition whose offset is out of range and return a new fetch
offset.
+ */
+ protected def handleOffsetOutOfRange(topicPartition: TopicPartition): Long =
{
+ val replicaEndOffset = logEndOffset(topicPartition)
+
+ /**
+ * Unclean leader election: A follower goes down, in the meanwhile the
leader keeps appending messages. The follower comes back up
+ * and before it has completely caught up with the leader's logs, all
replicas in the ISR go down. The follower is now uncleanly
+ * elected as the new leader, and it starts appending messages from the
client. The old leader comes back up, becomes a follower
+ * and it may discover that the current leader's end offset is behind its
own end offset.
+ *
+ * In such a case, truncate the current follower's log to the current
leader's end offset and continue fetching.
+ *
+ * There is a potential for a mismatch between the logs of the two
replicas here. We don't fix this mismatch as of now.
+ */
+ val leaderEndOffset = fetchLatestOffsetFromLeader(topicPartition)
+ if (leaderEndOffset < replicaEndOffset) {
+ // Prior to truncating the follower's log, ensure that doing so is not
disallowed by the configuration for unclean leader election.
+ // This situation could only happen if the unclean election
configuration for a topic changes while a replica is down. Otherwise,
+ // we should never encounter this situation since a non-ISR leader
cannot be elected if disallowed by the broker configuration.
+ if (!isUncleanLeaderElectionAllowed(topicPartition)) {
+ // Log a fatal error and shutdown the broker to ensure that data loss
does not occur unexpectedly.
+ fatal(s"Exiting because log truncation is not allowed for partition
$topicPartition, current leader's " +
+ s"latest offset $leaderEndOffset is less than replica's latest
offset $replicaEndOffset}")
+ throw new FatalExitError
+ }
+
+ warn(s"Reset fetch offset for partition $topicPartition from
$replicaEndOffset to current " +
+ s"leader's latest offset $leaderEndOffset")
+ truncate(topicPartition, new EpochEndOffset(Errors.NONE,
UNDEFINED_EPOCH, leaderEndOffset))
+ leaderEndOffset
+ } else {
+ /**
+ * If the leader's log end offset is greater than the follower's log end
offset, there are two possibilities:
+ * 1. The follower could have been down for a long time and when it
starts up, its end offset could be smaller than the leader's
+ * start offset because the leader has deleted old logs
(log.logEndOffset < leaderStartOffset).
+ * 2. When unclean leader election occurs, it is possible that the old
leader's high watermark is greater than
+ * the new leader's log end offset. So when the old leader truncates its
offset to its high watermark and starts
+ * to fetch from the new leader, an OffsetOutOfRangeException will be
thrown. After that some more messages are
+ * produced to the new leader. While the old leader is trying to handle
the OffsetOutOfRangeException and query
+ * the log end offset of the new leader, the new leader's log end offset
becomes higher than the follower's log end offset.
+ *
+ * In the first case, the follower's current log end offset is smaller
than the leader's log start offset. So the
+ * follower should truncate all its logs, roll out a new segment and
start to fetch from the current leader's log
+ * start offset.
+ * In the second case, the follower should just keep the current log
segments and retry the fetch. In the second
+ * case, there will be some inconsistency of data between old and new
leader. We are not solving it here.
+ * If users want to have strong consistency guarantees, appropriate
configurations needs to be set for both
+ * brokers and producers.
+ *
+ * Putting the two cases together, the follower should fetch from the
higher one of its replica log end offset
+ * and the current leader's log start offset.
+ */
+ val leaderStartOffset = fetchEarliestOffsetFromLeader(topicPartition)
+ warn(s"Reset fetch offset for partition $topicPartition from
$replicaEndOffset to current " +
+ s"leader's start offset $leaderStartOffset")
+ val offsetToFetch = Math.max(leaderStartOffset, replicaEndOffset)
+ // Only truncate log when current leader's log start offset is greater
than follower's log end offset.
+ if (leaderStartOffset > replicaEndOffset)
+ truncateFullyAndStartAt(topicPartition, leaderStartOffset)
+ offsetToFetch
+ }
+ }
+
+
def delayPartitions(partitions: Iterable[TopicPartition], delay: Long) {
partitionMapLock.lockInterruptibly()
try {
@@ -459,7 +542,7 @@ abstract class AbstractFetcherThread(name: String,
}.toMap
}
- private def toMemoryRecords(records: Records): MemoryRecords = {
+ protected def toMemoryRecords(records: Records): MemoryRecords = {
records match {
case r: MemoryRecords => r
case r: FileRecords =>
@@ -587,3 +670,5 @@ case class OffsetTruncationState(offset: Long,
truncationCompleted: Boolean) {
override def toString = "offset:%d-truncationCompleted:%b".format(offset,
truncationCompleted)
}
+
+case class OffsetAndEpoch(offset: Long, epoch: Int)
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 1621201..5aec7a9 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -20,13 +20,14 @@ package kafka.server
import java.util
import kafka.api.Request
-import kafka.cluster.{BrokerEndPoint, Replica}
+import kafka.cluster.BrokerEndPoint
+import kafka.log.LogAppendInfo
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.server.QuotaFactory.UnboundedQuota
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{MemoryRecords, Records}
+import org.apache.kafka.common.record.Records
import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.requests.FetchResponse.PartitionData
import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest,
FetchResponse}
@@ -44,18 +45,32 @@ class ReplicaAlterLogDirsThread(name: String,
clientId = name,
sourceBroker = sourceBroker,
fetchBackOffMs =
brokerConfig.replicaFetchBackoffMs,
- isInterruptible = false,
- includeLogTruncation = true) {
+ isInterruptible = false) {
private val replicaId = brokerConfig.brokerId
private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
private val fetchSize = brokerConfig.replicaFetchMaxBytes
- protected def getReplica(tp: TopicPartition): Option[Replica] = {
- replicaMgr.getReplica(tp, Request.FutureLocalReplicaId)
+ override protected def latestEpoch(topicPartition: TopicPartition):
Option[Int] = {
+ replicaMgr.getReplicaOrException(topicPartition,
Request.FutureLocalReplicaId).epochs.map(_.latestEpoch)
}
- def fetch(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)] = {
+ override protected def logEndOffset(topicPartition: TopicPartition): Long = {
+ replicaMgr.getReplicaOrException(topicPartition,
Request.FutureLocalReplicaId).logEndOffset.messageOffset
+ }
+
+ override protected def endOffsetForEpoch(topicPartition: TopicPartition,
epoch: Int): Option[OffsetAndEpoch] = {
+ val replica = replicaMgr.getReplicaOrException(topicPartition,
Request.FutureLocalReplicaId)
+ replica.epochs.flatMap { epochCache =>
+ val (foundEpoch, foundOffset) = epochCache.endOffsetFor(epoch)
+ if (foundOffset == UNDEFINED_EPOCH_OFFSET)
+ None
+ else
+ Some(OffsetAndEpoch(foundOffset, foundEpoch))
+ }
+ }
+
+ def fetchFromLeader(fetchRequest: FetchRequest.Builder):
Seq[(TopicPartition, PD)] = {
var partitionData: Seq[(TopicPartition,
FetchResponse.PartitionData[Records])] = null
val request = fetchRequest.build()
@@ -86,16 +101,18 @@ class ReplicaAlterLogDirsThread(name: String,
}
// process fetched data
- def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long,
partitionData: PartitionData[Records],
- records: MemoryRecords) {
+ override def processPartitionData(topicPartition: TopicPartition,
+ fetchOffset: Long,
+ partitionData: PartitionData[Records]):
Option[LogAppendInfo] = {
val futureReplica = replicaMgr.getReplicaOrException(topicPartition,
Request.FutureLocalReplicaId)
val partition = replicaMgr.getPartition(topicPartition).get
+ val records = toMemoryRecords(partitionData.records)
if (fetchOffset != futureReplica.logEndOffset.messageOffset)
throw new IllegalStateException("Offset mismatch for the future replica
%s: fetched offset = %d, log end offset = %d.".format(
topicPartition, fetchOffset, futureReplica.logEndOffset.messageOffset))
- partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = true)
+ val logAppendInfo =
partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = true)
val futureReplicaHighWatermark =
futureReplica.logEndOffset.messageOffset.min(partitionData.highWatermark)
futureReplica.highWatermark = new
LogOffsetMetadata(futureReplicaHighWatermark)
futureReplica.maybeIncrementLogStartOffset(partitionData.logStartOffset)
@@ -104,29 +121,17 @@ class ReplicaAlterLogDirsThread(name: String,
removePartitions(Set(topicPartition))
quota.record(records.sizeInBytes)
+ logAppendInfo
}
- def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = {
- val futureReplica = replicaMgr.getReplicaOrException(topicPartition,
Request.FutureLocalReplicaId)
- val currentReplica = replicaMgr.getReplicaOrException(topicPartition)
- val partition = replicaMgr.getPartition(topicPartition).get
- val logEndOffset: Long = currentReplica.logEndOffset.messageOffset
+ override protected def isUncleanLeaderElectionAllowed(topicPartition:
TopicPartition): Boolean = true
- if (logEndOffset < futureReplica.logEndOffset.messageOffset) {
- warn("Future replica for partition %s reset its fetch offset from %d to
current replica's latest offset %d"
- .format(topicPartition, futureReplica.logEndOffset.messageOffset,
logEndOffset))
- partition.truncateTo(logEndOffset, isFuture = true)
- logEndOffset
- } else {
- val currentReplicaStartOffset: Long = currentReplica.logStartOffset
- warn("Future replica for partition %s reset its fetch offset from %d to
current replica's start offset %d"
- .format(topicPartition, futureReplica.logEndOffset.messageOffset,
currentReplicaStartOffset))
- val offsetToFetch = Math.max(currentReplicaStartOffset,
futureReplica.logEndOffset.messageOffset)
- // Only truncate the log when current replica's log start offset is
greater than future replica's log end offset.
- if (currentReplicaStartOffset > futureReplica.logEndOffset.messageOffset)
- partition.truncateFullyAndStartAt(currentReplicaStartOffset, isFuture
= true)
- offsetToFetch
- }
+ override protected def fetchEarliestOffsetFromLeader(topicPartition:
TopicPartition): Long = {
+ replicaMgr.getReplicaOrException(topicPartition).logStartOffset
+ }
+
+ override protected def fetchLatestOffsetFromLeader(topicPartition:
TopicPartition): Long = {
+ replicaMgr.getReplicaOrException(topicPartition).logEndOffset.messageOffset
}
/**
@@ -134,7 +139,7 @@ class ReplicaAlterLogDirsThread(name: String,
* @param partitions map of topic partition -> leader epoch of the future
replica
* @return map of topic partition -> end offset for a requested leader epoch
*/
- def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]):
Map[TopicPartition, EpochEndOffset] = {
+ override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]):
Map[TopicPartition, EpochEndOffset] = {
partitions.map { case (tp, epoch) =>
try {
val (leaderEpoch, leaderOffset) =
replicaMgr.getReplicaOrException(tp).epochs.get.endOffsetFor(epoch)
@@ -161,14 +166,14 @@ class ReplicaAlterLogDirsThread(name: String,
* the future replica may miss "mark for truncation" event and must use the
offset for leader epoch
* exchange with the current replica to truncate to the largest common log
prefix for the topic partition
*/
- override def truncate(topicPartition: TopicPartition, epochEndOffset:
EpochEndOffset): OffsetTruncationState = {
- val futureReplica = replicaMgr.getReplicaOrException(topicPartition,
Request.FutureLocalReplicaId)
+ override def truncate(topicPartition: TopicPartition, truncationState:
OffsetTruncationState): Unit = {
val partition = replicaMgr.getPartition(topicPartition).get
+ partition.truncateTo(truncationState.offset, isFuture = true)
+ }
- val offsetTruncationState = getOffsetTruncationState(topicPartition,
epochEndOffset, futureReplica,
- isFutureReplica = true)
- partition.truncateTo(offsetTruncationState.offset, isFuture = true)
- offsetTruncationState
+ override protected def truncateFullyAndStartAt(topicPartition:
TopicPartition, offset: Long): Unit = {
+ val partition = replicaMgr.getPartition(topicPartition).get
+ partition.truncateFullyAndStartAt(offset, isFuture = true)
}
def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]):
ResultWithPartitions[Option[FetchRequest.Builder]] = {
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 5624e84..1848eb7 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -18,14 +18,13 @@
package kafka.server
import kafka.api._
-import kafka.cluster.{BrokerEndPoint, Replica}
-import kafka.log.LogConfig
+import kafka.cluster.BrokerEndPoint
+import kafka.log.{LogAppendInfo, LogConfig}
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.zk.AdminZkClient
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.KafkaStorageException
-import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, Records}
@@ -49,8 +48,7 @@ class ReplicaFetcherThread(name: String,
clientId = name,
sourceBroker = sourceBroker,
fetchBackOffMs =
brokerConfig.replicaFetchBackoffMs,
- isInterruptible = false,
- includeLogTruncation = true) {
+ isInterruptible = false) {
private val replicaId = brokerConfig.brokerId
private val logContext = new LogContext(s"[ReplicaFetcher
replicaId=$replicaId, leaderId=${sourceBroker.id}, " +
@@ -88,12 +86,26 @@ class ReplicaFetcherThread(name: String,
private val minBytes = brokerConfig.replicaFetchMinBytes
private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
private val fetchSize = brokerConfig.replicaFetchMaxBytes
- private val brokerSupportsLeaderEpochRequest: Boolean =
brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2
-
+ private val brokerSupportsLeaderEpochRequest =
brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2
private val fetchSessionHandler = new FetchSessionHandler(logContext,
sourceBroker.id)
- protected def getReplica(tp: TopicPartition): Option[Replica] = {
- replicaMgr.getReplica(tp)
+ override protected def latestEpoch(topicPartition: TopicPartition):
Option[Int] = {
+ replicaMgr.getReplicaOrException(topicPartition).epochs.map(_.latestEpoch)
+ }
+
+ override protected def logEndOffset(topicPartition: TopicPartition): Long = {
+ replicaMgr.getReplicaOrException(topicPartition).logEndOffset.messageOffset
+ }
+
+ override protected def endOffsetForEpoch(topicPartition: TopicPartition,
epoch: Int): Option[OffsetAndEpoch] = {
+ val replica = replicaMgr.getReplicaOrException(topicPartition)
+ replica.epochs.flatMap { epochCache =>
+ val (foundEpoch, foundOffset) = epochCache.endOffsetFor(epoch)
+ if (foundOffset == UNDEFINED_EPOCH_OFFSET)
+ None
+ else
+ Some(OffsetAndEpoch(foundOffset, foundEpoch))
+ }
}
override def initiateShutdown(): Boolean = {
@@ -105,9 +117,12 @@ class ReplicaFetcherThread(name: String,
}
// process fetched data
- def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long,
partitionData: PD, records: MemoryRecords) {
+ override def processPartitionData(topicPartition: TopicPartition,
+ fetchOffset: Long,
+ partitionData: PD): Option[LogAppendInfo]
= {
val replica = replicaMgr.getReplicaOrException(topicPartition)
val partition = replicaMgr.getPartition(topicPartition).get
+ val records = toMemoryRecords(partitionData.records)
maybeWarnIfOversizedRecords(records, topicPartition)
@@ -120,7 +135,7 @@ class ReplicaFetcherThread(name: String,
.format(replica.logEndOffset.messageOffset, topicPartition,
records.sizeInBytes, partitionData.highWatermark))
// Append the leader's messages to the log
- partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
+ val logAppendInfo =
partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
if (isTraceEnabled)
trace("Follower has replica log end offset %d after appending %d bytes
of messages for partition %s"
@@ -140,6 +155,8 @@ class ReplicaFetcherThread(name: String,
if (quota.isThrottled(topicPartition))
quota.record(records.sizeInBytes)
replicaMgr.brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
+
+ logAppendInfo
}
def maybeWarnIfOversizedRecords(records: MemoryRecords, topicPartition:
TopicPartition): Unit = {
@@ -151,79 +168,13 @@ class ReplicaFetcherThread(name: String,
"equal or larger than your settings for max.message.bytes, both at a
broker and topic level.")
}
- /**
- * Handle a partition whose offset is out of range and return a new fetch
offset.
- */
- def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = {
- val replica = replicaMgr.getReplicaOrException(topicPartition)
- val partition = replicaMgr.getPartition(topicPartition).get
-
- /**
- * Unclean leader election: A follower goes down, in the meanwhile the
leader keeps appending messages. The follower comes back up
- * and before it has completely caught up with the leader's logs, all
replicas in the ISR go down. The follower is now uncleanly
- * elected as the new leader, and it starts appending messages from the
client. The old leader comes back up, becomes a follower
- * and it may discover that the current leader's end offset is behind its
own end offset.
- *
- * In such a case, truncate the current follower's log to the current
leader's end offset and continue fetching.
- *
- * There is a potential for a mismatch between the logs of the two
replicas here. We don't fix this mismatch as of now.
- */
- val leaderEndOffset: Long = earliestOrLatestOffset(topicPartition,
ListOffsetRequest.LATEST_TIMESTAMP)
-
- if (leaderEndOffset < replica.logEndOffset.messageOffset) {
- // Prior to truncating the follower's log, ensure that doing so is not
disallowed by the configuration for unclean leader election.
- // This situation could only happen if the unclean election
configuration for a topic changes while a replica is down. Otherwise,
- // we should never encounter this situation since a non-ISR leader
cannot be elected if disallowed by the broker configuration.
- val adminZkClient = new AdminZkClient(replicaMgr.zkClient)
- if (!LogConfig.fromProps(brokerConfig.originals,
adminZkClient.fetchEntityConfig(
- ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable) {
- // Log a fatal error and shutdown the broker to ensure that data loss
does not occur unexpectedly.
- fatal(s"Exiting because log truncation is not allowed for partition
$topicPartition, current leader's " +
- s"latest offset $leaderEndOffset is less than replica's latest
offset ${replica.logEndOffset.messageOffset}")
- throw new FatalExitError
- }
-
- warn(s"Reset fetch offset for partition $topicPartition from
${replica.logEndOffset.messageOffset} to current " +
- s"leader's latest offset $leaderEndOffset")
- partition.truncateTo(leaderEndOffset, isFuture = false)
-
replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId,
topicPartition, leaderEndOffset)
- leaderEndOffset
- } else {
- /**
- * If the leader's log end offset is greater than the follower's log end
offset, there are two possibilities:
- * 1. The follower could have been down for a long time and when it
starts up, its end offset could be smaller than the leader's
- * start offset because the leader has deleted old logs
(log.logEndOffset < leaderStartOffset).
- * 2. When unclean leader election occurs, it is possible that the old
leader's high watermark is greater than
- * the new leader's log end offset. So when the old leader truncates its
offset to its high watermark and starts
- * to fetch from the new leader, an OffsetOutOfRangeException will be
thrown. After that some more messages are
- * produced to the new leader. While the old leader is trying to handle
the OffsetOutOfRangeException and query
- * the log end offset of the new leader, the new leader's log end offset
becomes higher than the follower's log end offset.
- *
- * In the first case, the follower's current log end offset is smaller
than the leader's log start offset. So the
- * follower should truncate all its logs, roll out a new segment and
start to fetch from the current leader's log
- * start offset.
- * In the second case, the follower should just keep the current log
segments and retry the fetch. In the second
- * case, there will be some inconsistency of data between old and new
leader. We are not solving it here.
- * If users want to have strong consistency guarantees, appropriate
configurations needs to be set for both
- * brokers and producers.
- *
- * Putting the two cases together, the follower should fetch from the
higher one of its replica log end offset
- * and the current leader's log start offset.
- *
- */
- val leaderStartOffset: Long = earliestOrLatestOffset(topicPartition,
ListOffsetRequest.EARLIEST_TIMESTAMP)
- warn(s"Reset fetch offset for partition $topicPartition from
${replica.logEndOffset.messageOffset} to current " +
- s"leader's start offset $leaderStartOffset")
- val offsetToFetch = Math.max(leaderStartOffset,
replica.logEndOffset.messageOffset)
- // Only truncate log when current leader's log start offset is greater
than follower's log end offset.
- if (leaderStartOffset > replica.logEndOffset.messageOffset) {
- partition.truncateFullyAndStartAt(leaderStartOffset, isFuture = false)
- }
- offsetToFetch
- }
+ override protected def isUncleanLeaderElectionAllowed(topicPartition:
TopicPartition): Boolean = {
+ val adminZkClient = new AdminZkClient(replicaMgr.zkClient)
+ LogConfig.fromProps(brokerConfig.originals,
adminZkClient.fetchEntityConfig(
+ ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable
}
- protected def fetch(fetchRequest: FetchRequest.Builder):
Seq[(TopicPartition, PD)] = {
+ override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder):
Seq[(TopicPartition, PD)] = {
try {
val clientResponse = leaderEndpoint.sendRequest(fetchRequest)
val fetchResponse =
clientResponse.responseBody.asInstanceOf[FetchResponse[Records]]
@@ -239,7 +190,15 @@ class ReplicaFetcherThread(name: String,
}
}
- private def earliestOrLatestOffset(topicPartition: TopicPartition,
earliestOrLatest: Long): Long = {
+ override protected def fetchEarliestOffsetFromLeader(topicPartition:
TopicPartition): Long = {
+ fetchOffsetFromLeader(topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP)
+ }
+
+ override protected def fetchLatestOffsetFromLeader(topicPartition:
TopicPartition): Long = {
+ fetchOffsetFromLeader(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP)
+ }
+
+ private def fetchOffsetFromLeader(topicPartition: TopicPartition,
earliestOrLatest: Long): Long = {
val requestBuilder = if (brokerConfig.interBrokerProtocolVersion >=
KAFKA_0_10_1_IV2) {
val partitions = Map(topicPartition -> (earliestOrLatest:
java.lang.Long))
ListOffsetRequest.Builder.forReplica(listOffsetRequestVersion,
replicaId).setTargetTimes(partitions.asJava)
@@ -299,19 +258,23 @@ class ReplicaFetcherThread(name: String,
* Truncate the log for each partition's epoch based on leader's returned
epoch and offset.
* The logic for finding the truncation offset is implemented in
AbstractFetcherThread.getOffsetTruncationState
*/
- override def truncate(tp: TopicPartition, epochEndOffset: EpochEndOffset):
OffsetTruncationState = {
+ override def truncate(tp: TopicPartition, offsetTruncationState:
OffsetTruncationState): Unit = {
val replica = replicaMgr.getReplicaOrException(tp)
val partition = replicaMgr.getPartition(tp).get
+ partition.truncateTo(offsetTruncationState.offset, isFuture = false)
- val offsetTruncationState = getOffsetTruncationState(tp, epochEndOffset,
replica)
if (offsetTruncationState.offset < replica.highWatermark.messageOffset)
- warn(s"Truncating $tp to offset ${offsetTruncationState.offset} below
high watermark ${replica.highWatermark.messageOffset}")
- partition.truncateTo(offsetTruncationState.offset, isFuture = false)
+ warn(s"Truncating $tp to offset ${offsetTruncationState.offset} below
high watermark " +
+ s"${replica.highWatermark.messageOffset}")
// mark the future replica for truncation only when we do last truncation
if (offsetTruncationState.truncationCompleted)
replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId,
tp, offsetTruncationState.offset)
- offsetTruncationState
+ }
+
+ override protected def truncateFullyAndStartAt(topicPartition:
TopicPartition, offset: Long): Unit = {
+ val partition = replicaMgr.getPartition(topicPartition).get
+ partition.truncateFullyAndStartAt(offset, isFuture = false)
}
override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]):
Map[TopicPartition, EpochEndOffset] = {
@@ -360,4 +323,5 @@ class ReplicaFetcherThread(name: String,
val isReplicaInSync = fetcherLagStats.isReplicaInSync(topicPartition)
quota.isThrottled(topicPartition) && quota.isQuotaExceeded &&
!isReplicaInSync
}
+
}
diff --git
a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
index 6fcf0cc..392c912 100644
---
a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++
b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
@@ -88,7 +88,7 @@ class ReplicaFetcherThreadFatalErrorTest extends
ZooKeeperTestHarness {
import params._
new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config,
replicaManager, metrics, time, quotaManager) {
override def handleOffsetOutOfRange(topicPartition: TopicPartition):
Long = throw new FatalExitError
- override protected def fetch(fetchRequest: FetchRequest.Builder):
Seq[(TopicPartition, PD)] = {
+ override protected def fetchFromLeader(fetchRequest:
FetchRequest.Builder): Seq[(TopicPartition, PD)] = {
fetchRequest.fetchData.asScala.keys.toSeq.map { tp =>
(tp, new
FetchResponse.PartitionData[Records](Errors.OFFSET_OUT_OF_RANGE,
FetchResponse.INVALID_HIGHWATERMARK,
FetchResponse.INVALID_LAST_STABLE_OFFSET,
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 15abc68..c456433 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -17,20 +17,26 @@
package kafka.server
+import java.nio.ByteBuffer
+
import AbstractFetcherThread._
import com.yammer.metrics.Metrics
-import kafka.cluster.{BrokerEndPoint, Replica}
+import kafka.cluster.BrokerEndPoint
+import kafka.log.LogAppendInfo
+import kafka.message.NoCompressionCodec
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords,
Records, SimpleRecord}
+import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest}
-import org.apache.kafka.common.requests.FetchResponse.PartitionData
-import org.junit.Assert.{assertFalse, assertTrue}
+import org.apache.kafka.common.utils.Time
+import org.junit.Assert._
import org.junit.{Before, Test}
import scala.collection.JavaConverters._
import scala.collection.{Map, Set, mutable}
+import scala.util.Random
class AbstractFetcherThreadTest {
@@ -40,174 +46,412 @@ class AbstractFetcherThreadTest {
Metrics.defaultRegistry().removeMetric(metricName)
}
+ private def allMetricsNames: Set[String] =
Metrics.defaultRegistry().allMetrics().asScala.keySet.map(_.getName)
+
+ private def mkBatch(baseOffset: Long, leaderEpoch: Int, records:
SimpleRecord*): RecordBatch = {
+ MemoryRecords.withRecords(baseOffset, CompressionType.NONE, leaderEpoch,
records: _*)
+ .batches.asScala.head
+ }
+
@Test
- def testMetricsRemovedOnShutdown() {
+ def testMetricsRemovedOnShutdown(): Unit = {
val partition = new TopicPartition("topic", 0)
- val fetcherThread = new DummyFetcherThread("dummy", "client", new
BrokerEndPoint(0, "localhost", 9092))
-
- fetcherThread.start()
+ val fetcher = new MockFetcherThread
// add one partition to create the consumer lag metric
- fetcherThread.addPartitions(Map(partition -> 0L))
+ fetcher.setReplicaState(partition, MockFetcherThread.PartitionState())
+ fetcher.addPartitions(Map(partition -> 0L))
+ fetcher.setLeaderState(partition, MockFetcherThread.PartitionState())
+
+ fetcher.start()
// wait until all fetcher metrics are present
TestUtils.waitUntilTrue(() =>
allMetricsNames == Set(FetcherMetrics.BytesPerSec,
FetcherMetrics.RequestsPerSec, FetcherMetrics.ConsumerLag),
"Failed waiting for all fetcher metrics to be registered")
- fetcherThread.shutdown()
+ fetcher.shutdown()
// after shutdown, they should be gone
assertTrue(Metrics.defaultRegistry().allMetrics().isEmpty)
}
@Test
- def testConsumerLagRemovedWithPartition() {
+ def testConsumerLagRemovedWithPartition(): Unit = {
val partition = new TopicPartition("topic", 0)
- val fetcherThread = new DummyFetcherThread("dummy", "client", new
BrokerEndPoint(0, "localhost", 9092))
-
- fetcherThread.start()
+ val fetcher = new MockFetcherThread
// add one partition to create the consumer lag metric
- fetcherThread.addPartitions(Map(partition -> 0L))
+ fetcher.setReplicaState(partition, MockFetcherThread.PartitionState())
+ fetcher.addPartitions(Map(partition -> 0L))
+ fetcher.setLeaderState(partition, MockFetcherThread.PartitionState())
+
+ fetcher.doWork()
- // wait until lag metric is present
- TestUtils.waitUntilTrue(() => allMetricsNames(FetcherMetrics.ConsumerLag),
- "Failed waiting for consumer lag metric")
+ assertTrue("Failed waiting for consumer lag metric",
+ allMetricsNames(FetcherMetrics.ConsumerLag))
// remove the partition to simulate leader migration
- fetcherThread.removePartitions(Set(partition))
+ fetcher.removePartitions(Set(partition))
// the lag metric should now be gone
assertFalse(allMetricsNames(FetcherMetrics.ConsumerLag))
+ }
+
+ @Test
+ def testSimpleFetch(): Unit = {
+ val partition = new TopicPartition("topic", 0)
+ val fetcher = new MockFetcherThread
+
+ fetcher.setReplicaState(partition, MockFetcherThread.PartitionState())
+ fetcher.addPartitions(Map(partition -> 0L))
+
+ val batch = mkBatch(baseOffset = 0L, leaderEpoch = 0,
+ new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))
+ val leaderState = MockFetcherThread.PartitionState(Seq(batch),
highWatermark = 2L)
+ fetcher.setLeaderState(partition, leaderState)
+
+ fetcher.doWork()
+
+ val replicaState = fetcher.replicaPartitionState(partition)
+ assertEquals(2L, replicaState.logEndOffset)
+ assertEquals(2L, replicaState.highWatermark)
+ }
+
+ @Test
+ def testTruncation(): Unit = {
+ val partition = new TopicPartition("topic", 0)
+ val fetcher = new MockFetcherThread
+
+ val replicaLog = Seq(
+ mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+ mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
+ mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
+
+ val replicaState = MockFetcherThread.PartitionState(replicaLog,
highWatermark = 0L)
+ fetcher.setReplicaState(partition, replicaState)
+ fetcher.addPartitions(Map(partition -> 3L))
+
+ val leaderLog = Seq(
+ mkBatch(baseOffset = 0, leaderEpoch = 1, new SimpleRecord("a".getBytes)),
+ mkBatch(baseOffset = 1, leaderEpoch = 3, new SimpleRecord("b".getBytes)),
+ mkBatch(baseOffset = 2, leaderEpoch = 5, new SimpleRecord("c".getBytes)))
+
+ val leaderState = MockFetcherThread.PartitionState(leaderLog,
highWatermark = 2L)
+ fetcher.setLeaderState(partition, leaderState)
- fetcherThread.shutdown()
+ TestUtils.waitUntilTrue(() => {
+ fetcher.doWork()
+ fetcher.replicaPartitionState(partition).log ==
fetcher.leaderPartitionState(partition).log
+ }, "Failed to reconcile leader and follower logs")
+
+ assertEquals(leaderState.logStartOffset, replicaState.logStartOffset)
+ assertEquals(leaderState.logEndOffset, replicaState.logEndOffset)
+ assertEquals(leaderState.highWatermark, replicaState.highWatermark)
}
- private def allMetricsNames =
Metrics.defaultRegistry().allMetrics().asScala.keySet.map(_.getName)
+ @Test(expected = classOf[FatalExitError])
+ def testFollowerFetchOutOfRangeHighUncleanLeaderElectionDisallowed(): Unit =
{
+ val partition = new TopicPartition("topic", 0)
+ val fetcher = new MockFetcherThread(isUncleanLeaderElectionAllowed = false)
+
+ val replicaLog = Seq(
+ mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+ mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
+ mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
+
+ val replicaState = MockFetcherThread.PartitionState(replicaLog,
highWatermark = 0L)
+ fetcher.setReplicaState(partition, replicaState)
+ fetcher.addPartitions(Map(partition -> 3L))
+
+ val leaderLog = Seq(
+ mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+ mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
+ mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
- protected def fetchRequestBuilder(partitionMap:
collection.Map[TopicPartition, PartitionFetchState]): FetchRequest.Builder = {
- val partitionData = partitionMap.map { case (tp, fetchState) =>
- tp -> new FetchRequest.PartitionData(fetchState.fetchOffset, 0, 1024 *
1024)
- }.toMap.asJava
- FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, 0, 0, 1,
partitionData)
+ val leaderState = MockFetcherThread.PartitionState(leaderLog,
highWatermark = 2L)
+ fetcher.setLeaderState(partition, leaderState)
+
+ // initial truncation and verify that the log end offset is updated
+ fetcher.doWork()
+ assertEquals(3L, replicaState.logEndOffset)
+ assertFalse(fetcher.partitionStates.stateValue(partition).isTruncatingLog)
+
+ // To hit this case, we have to change the leader log without going
through the truncation phase
+ leaderState.log.clear()
+ leaderState.logEndOffset = 0L
+ leaderState.logStartOffset = 0L
+ leaderState.highWatermark = 0L
+
+ fetcher.doWork()
}
- class DummyFetcherThread(name: String,
- clientId: String,
- sourceBroker: BrokerEndPoint,
- fetchBackOffMs: Int = 0)
- extends AbstractFetcherThread(name, clientId, sourceBroker,
- fetchBackOffMs,
- isInterruptible = true,
- includeLogTruncation = false) {
+ @Test
+ def testFollowerFetchOutOfRangeLow(): Unit = {
+ val partition = new TopicPartition("topic", 0)
+ val fetcher = new MockFetcherThread
- protected def getReplica(tp: TopicPartition): Option[Replica] = None
+ // The follower begins from an offset which is behind the leader's log
start offset
+ val replicaLog = Seq(
+ mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)))
- override def processPartitionData(topicPartition: TopicPartition,
- fetchOffset: Long,
- partitionData: PD,
- records: MemoryRecords): Unit = {}
+ val replicaState = MockFetcherThread.PartitionState(replicaLog,
highWatermark = 0L)
+ fetcher.setReplicaState(partition, replicaState)
+ fetcher.addPartitions(Map(partition -> 3L))
- override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long
= 0L
+ val leaderLog = Seq(
+ mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
- override protected def fetch(fetchRequest: FetchRequest.Builder):
Seq[(TopicPartition, PD)] =
- fetchRequest.fetchData.asScala.mapValues(_ => new
PartitionData[Records](Errors.NONE, 0, 0, 0,
- Seq.empty.asJava, MemoryRecords.EMPTY)).toSeq
+ val leaderState = MockFetcherThread.PartitionState(leaderLog,
highWatermark = 2L)
+ fetcher.setLeaderState(partition, leaderState)
- override protected def buildFetch(partitionMap:
collection.Map[TopicPartition, PartitionFetchState]):
ResultWithPartitions[Option[FetchRequest.Builder]] = {
- ResultWithPartitions(Some(fetchRequestBuilder(partitionMap)), Set())
- }
+ // initial truncation and verify that the log start offset is updated
+ fetcher.doWork()
+ assertFalse(fetcher.partitionStates.stateValue(partition).isTruncatingLog)
+ assertEquals(2, replicaState.logStartOffset)
+ assertEquals(List(), replicaState.log.toList)
- override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]):
Map[TopicPartition, EpochEndOffset] = { Map() }
+ TestUtils.waitUntilTrue(() => {
+ fetcher.doWork()
+ fetcher.replicaPartitionState(partition).log ==
fetcher.leaderPartitionState(partition).log
+ }, "Failed to reconcile leader and follower logs")
- override def truncate(tp: TopicPartition, epochEndOffset: EpochEndOffset):
OffsetTruncationState = {
- OffsetTruncationState(epochEndOffset.endOffset, truncationCompleted =
true)
- }
+ assertEquals(leaderState.logStartOffset, replicaState.logStartOffset)
+ assertEquals(leaderState.logEndOffset, replicaState.logEndOffset)
+ assertEquals(leaderState.highWatermark, replicaState.highWatermark)
}
@Test
- def testFetchRequestCorruptedMessageException() {
+ def testCorruptMessage(): Unit = {
val partition = new TopicPartition("topic", 0)
- val fetcherThread = new CorruptingFetcherThread("test", "client", new
BrokerEndPoint(0, "localhost", 9092),
- fetchBackOffMs = 1)
- fetcherThread.start()
+ val fetcher = new MockFetcherThread {
+ var fetchedOnce = false
+ override def fetchFromLeader(fetchRequest: FetchRequest.Builder):
Seq[(TopicPartition, PD)] = {
+ val fetchedData = super.fetchFromLeader(fetchRequest)
+ if (!fetchedOnce) {
+ val records = fetchedData.head._2.records.asInstanceOf[MemoryRecords]
+ val buffer = records.buffer()
+ buffer.putInt(15, buffer.getInt(15) ^ 23422)
+ buffer.putInt(30, buffer.getInt(30) ^ 93242)
+ fetchedOnce = true
+ }
+ fetchedData
+ }
+ }
+
+ fetcher.setReplicaState(partition, MockFetcherThread.PartitionState())
+ fetcher.addPartitions(Map(partition -> 0L))
- // Add one partition for fetching
- fetcherThread.addPartitions(Map(partition -> 0L))
+ val batch = mkBatch(baseOffset = 0L, leaderEpoch = 0,
+ new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))
+ val leaderState = MockFetcherThread.PartitionState(Seq(batch),
highWatermark = 2L)
+ fetcher.setLeaderState(partition, leaderState)
- // Wait until fetcherThread finishes the work
- TestUtils.waitUntilTrue(() => fetcherThread.fetchCount > 3, "Failed
waiting for fetcherThread to finish the work")
+ fetcher.doWork() // fails with corrupt record
+ fetcher.doWork() // should succeed
+
+ val replicaState = fetcher.replicaPartitionState(partition)
+ assertEquals(2L, replicaState.logEndOffset)
+ }
- fetcherThread.shutdown()
+ object MockFetcherThread {
+ class PartitionState(var log: mutable.Buffer[RecordBatch],
+ var logStartOffset: Long,
+ var logEndOffset: Long,
+ var highWatermark: Long)
+
+ object PartitionState {
+ def apply(log: Seq[RecordBatch], highWatermark: Long): PartitionState = {
+ val logStartOffset = log.headOption.map(_.baseOffset).getOrElse(0L)
+ val logEndOffset = log.lastOption.map(_.nextOffset).getOrElse(0L)
+ new PartitionState(log.toBuffer, logStartOffset, logEndOffset,
highWatermark)
+ }
- // The fetcherThread should have fetched two normal messages
- assertTrue(fetcherThread.logEndOffset == 2)
+ def apply(): PartitionState = {
+ apply(Seq(), 0L)
+ }
+ }
}
- class CorruptingFetcherThread(name: String,
- clientId: String,
- sourceBroker: BrokerEndPoint,
- fetchBackOffMs: Int = 0)
- extends DummyFetcherThread(name, clientId, sourceBroker, fetchBackOffMs) {
+ class MockFetcherThread(val replicaId: Int = 0,
+ val leaderId: Int = 1,
+ isUncleanLeaderElectionAllowed: Boolean = true)
+ extends AbstractFetcherThread("mock-fetcher",
+ clientId = "mock-fetcher",
+ sourceBroker = new BrokerEndPoint(leaderId, host = "localhost", port =
Random.nextInt())) {
- @volatile var logEndOffset = 0L
- @volatile var fetchCount = 0
+ import MockFetcherThread.PartitionState
- private val normalPartitionDataSet = List[PartitionData[Records]](
- new PartitionData(Errors.NONE, 0L, 0L, 0L, Seq.empty.asJava,
- MemoryRecords.withRecords(0L, CompressionType.NONE, new
SimpleRecord("hello".getBytes))),
- new PartitionData(Errors.NONE, 0L, 0L, 0L, Seq.empty.asJava,
- MemoryRecords.withRecords(1L, CompressionType.NONE, new
SimpleRecord("hello".getBytes)))
- )
+ private val replicaPartitionStates = mutable.Map[TopicPartition,
PartitionState]()
+ private val leaderPartitionStates = mutable.Map[TopicPartition,
PartitionState]()
+
+ def setLeaderState(topicPartition: TopicPartition, state: PartitionState):
Unit = {
+ leaderPartitionStates.put(topicPartition, state)
+ }
+
+ def setReplicaState(topicPartition: TopicPartition, state:
PartitionState): Unit = {
+ replicaPartitionStates.put(topicPartition, state)
+ }
+
+ def replicaPartitionState(topicPartition: TopicPartition): PartitionState
= {
+ replicaPartitionStates.getOrElse(topicPartition,
+ throw new IllegalArgumentException(s"Unknown partition
$topicPartition"))
+ }
+
+ def leaderPartitionState(topicPartition: TopicPartition): PartitionState =
{
+ leaderPartitionStates.getOrElse(topicPartition,
+ throw new IllegalArgumentException(s"Unknown partition
$topicPartition"))
+ }
override def processPartitionData(topicPartition: TopicPartition,
fetchOffset: Long,
- partitionData: PD,
- records: MemoryRecords): Unit = {
+ partitionData: PD):
Option[LogAppendInfo] = {
+ val state = replicaPartitionState(topicPartition)
+
// Throw exception if the fetchOffset does not match the fetcherThread
partition state
- if (fetchOffset != logEndOffset)
- throw new RuntimeException(
- "Offset mismatch for partition %s: fetched offset = %d, log end
offset = %d."
- .format(topicPartition, fetchOffset, logEndOffset))
+ if (fetchOffset != state.logEndOffset)
+ throw new RuntimeException(s"Offset mismatch for partition
$topicPartition: " +
+ s"fetched offset = $fetchOffset, log end offset =
${state.logEndOffset}.")
// Now check message's crc
- for (batch <- records.batches.asScala) {
+ val batches = partitionData.records.batches.asScala
+ var maxTimestamp = RecordBatch.NO_TIMESTAMP
+ var offsetOfMaxTimestamp = -1L
+ var lastOffset = state.logEndOffset
+
+ for (batch <- batches) {
batch.ensureValid()
- logEndOffset = batch.nextOffset
+ if (batch.maxTimestamp > maxTimestamp) {
+ maxTimestamp = batch.maxTimestamp
+ offsetOfMaxTimestamp = batch.baseOffset
+ }
+ state.log.append(batch)
+ state.logEndOffset = batch.nextOffset
+ lastOffset = batch.lastOffset
+ }
+
+ state.logStartOffset = partitionData.logStartOffset
+ state.highWatermark = partitionData.highWatermark
+
+ Some(LogAppendInfo(firstOffset = Some(fetchOffset),
+ lastOffset = lastOffset,
+ maxTimestamp = maxTimestamp,
+ offsetOfMaxTimestamp = offsetOfMaxTimestamp,
+ logAppendTime = Time.SYSTEM.milliseconds(),
+ logStartOffset = state.logStartOffset,
+ recordConversionStats = RecordConversionStats.EMPTY,
+ sourceCodec = NoCompressionCodec,
+ targetCodec = NoCompressionCodec,
+ shallowCount = batches.size,
+ validBytes = partitionData.records.sizeInBytes,
+ offsetsMonotonic = true,
+ lastOffsetOfFirstBatch =
batches.headOption.map(_.lastOffset).getOrElse(-1)))
+ }
+
+ override def truncate(topicPartition: TopicPartition, truncationState:
OffsetTruncationState): Unit = {
+ val state = replicaPartitionState(topicPartition)
+ state.log = state.log.takeWhile { batch =>
+ batch.lastOffset < truncationState.offset
+ }
+ state.logEndOffset = state.log.lastOption.map(_.lastOffset +
1).getOrElse(state.logStartOffset)
+ state.highWatermark = math.min(state.highWatermark, state.logEndOffset)
+ }
+
+ override def truncateFullyAndStartAt(topicPartition: TopicPartition,
offset: Long): Unit = {
+ val state = replicaPartitionState(topicPartition)
+ state.log.clear()
+ state.logStartOffset = offset
+ state.logEndOffset = offset
+ state.highWatermark = offset
+ }
+
+ override def buildFetch(partitionMap: Map[TopicPartition,
PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
+ val fetchData = mutable.Map.empty[TopicPartition,
FetchRequest.PartitionData]
+ partitionMap.foreach { case (partition, state) =>
+ if (state.isReadyForFetch) {
+ val replicaState = replicaPartitionState(partition)
+ fetchData.put(partition, new
FetchRequest.PartitionData(state.fetchOffset, replicaState.logStartOffset, 1024
* 1024))
+ }
}
+ val fetchRequest =
FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 1,
fetchData.asJava)
+ ResultWithPartitions(Some(fetchRequest), Set.empty)
}
- override protected def fetch(fetchRequest: FetchRequest.Builder):
Seq[(TopicPartition, PD)] = {
- fetchCount += 1
- // Set the first fetch to get a corrupted message
- if (fetchCount == 1) {
- val record = new SimpleRecord("hello".getBytes())
- val records = MemoryRecords.withRecords(CompressionType.NONE, record)
- val buffer = records.buffer
-
- // flip some bits in the message to ensure the crc fails
- buffer.putInt(15, buffer.getInt(15) ^ 23422)
- buffer.putInt(30, buffer.getInt(30) ^ 93242)
- fetchRequest.fetchData.asScala.mapValues(_ => new
PartitionData[Records](Errors.NONE, 0L, 0L, 0L,
- Seq.empty.asJava, records)).toSeq
- } else {
- // Then, the following fetches get the normal data
- fetchRequest.fetchData.asScala.mapValues(v =>
normalPartitionDataSet(v.fetchOffset.toInt)).toSeq
+ override def isUncleanLeaderElectionAllowed(topicPartition:
TopicPartition): Boolean = {
+ isUncleanLeaderElectionAllowed
+ }
+
+ override def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
+ val state = replicaPartitionState(topicPartition)
+
state.log.lastOption.map(_.partitionLeaderEpoch).orElse(Some(EpochEndOffset.UNDEFINED_EPOCH))
+ }
+
+ override def logEndOffset(topicPartition: TopicPartition): Long =
replicaPartitionState(topicPartition).logEndOffset
+
+ override def endOffsetForEpoch(topicPartition: TopicPartition, epoch:
Int): Option[OffsetAndEpoch] = {
+ lookupEndOffsetForEpoch(epoch, replicaPartitionState(topicPartition))
+ }
+
+ private def lookupEndOffsetForEpoch(epoch: Int, partitionState:
PartitionState): Option[OffsetAndEpoch] = {
+ var epochLowerBound = EpochEndOffset.UNDEFINED_EPOCH
+ for (batch <- partitionState.log) {
+ if (batch.partitionLeaderEpoch > epoch) {
+ return Some(OffsetAndEpoch(batch.baseOffset, epochLowerBound))
+ }
+ epochLowerBound = batch.partitionLeaderEpoch
}
+ None
}
- override protected def buildFetch(partitionMap:
collection.Map[TopicPartition, PartitionFetchState]):
ResultWithPartitions[Option[FetchRequest.Builder]] = {
- val requestMap = new mutable.HashMap[TopicPartition, Long]
- partitionMap.foreach { case (topicPartition, partitionFetchState) =>
- // Add backoff delay check
- if (partitionFetchState.isReadyForFetch)
- requestMap.put(topicPartition, partitionFetchState.fetchOffset)
+ override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]):
Map[TopicPartition, EpochEndOffset] = {
+ val endOffsets = mutable.Map[TopicPartition, EpochEndOffset]()
+ partitions.foreach { case (partition, epoch) =>
+ val state = leaderPartitionState(partition)
+ val epochEndOffset = lookupEndOffsetForEpoch(epoch, state) match {
+ case Some(OffsetAndEpoch(offset, epoch)) =>
+ new EpochEndOffset(Errors.NONE, epoch, offset)
+ case None =>
+ new EpochEndOffset(Errors.NONE, EpochEndOffset.UNDEFINED_EPOCH,
EpochEndOffset.UNDEFINED_EPOCH_OFFSET)
+ }
+ endOffsets.put(partition, epochEndOffset)
}
- ResultWithPartitions(Some(fetchRequestBuilder(partitionMap)), Set())
+ endOffsets
}
+ override def fetchFromLeader(fetchRequest: FetchRequest.Builder):
Seq[(TopicPartition, PD)] = {
+ fetchRequest.fetchData.asScala.map { case (partition, fetchData) =>
+ val state = leaderPartitionState(partition)
+ val (error, records) = if (fetchData.fetchOffset > state.logEndOffset
|| fetchData.fetchOffset < state.logStartOffset) {
+ (Errors.OFFSET_OUT_OF_RANGE, MemoryRecords.EMPTY)
+ } else {
+ // for simplicity, we fetch only one batch at a time
+ val records = state.log.find(_.baseOffset >= fetchData.fetchOffset)
match {
+ case Some(batch) =>
+ val buffer = ByteBuffer.allocate(batch.sizeInBytes())
+ batch.writeTo(buffer)
+ buffer.flip()
+ MemoryRecords.readableRecords(buffer)
+
+ case None =>
+ MemoryRecords.EMPTY
+ }
+
+ (Errors.NONE, records)
+ }
+
+ (partition, new PD(error, state.highWatermark, state.highWatermark,
state.logStartOffset,
+ List.empty.asJava, records))
+ }.toSeq
+ }
+
+ override protected def fetchEarliestOffsetFromLeader(topicPartition:
TopicPartition): Long = {
+ leaderPartitionState(topicPartition).logStartOffset
+ }
+
+ override protected def fetchLatestOffsetFromLeader(topicPartition:
TopicPartition): Long = {
+ leaderPartitionState(topicPartition).logEndOffset
+ }
}
}