This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 77137e9 KAFKA-3978; Ensure high watermark is always positive (#4695)
77137e9 is described below
commit 77137e993b15ad4272972f05f61e7faee36a1914
Author: Dong Lin <[email protected]>
AuthorDate: Tue Mar 13 22:52:59 2018 -0700
KAFKA-3978; Ensure high watermark is always positive (#4695)
Partition high watermark may become -1 if the initial value is out of
range. This situation can occur during partition reassignment, for example. The
bug was fixed and validated with unit test in this patch.
Reviewers: Ismael Juma <[email protected]>, Jason Gustafson
<[email protected]>
---
core/src/main/scala/kafka/cluster/Partition.scala | 6 ++++-
core/src/main/scala/kafka/cluster/Replica.scala | 14 ++++++++++--
core/src/main/scala/kafka/log/Log.scala | 8 +++----
.../admin/ReassignPartitionsClusterTest.scala | 26 ++++++++++++++++++++++
4 files changed, 47 insertions(+), 7 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index 3b97671..68faf00 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -460,7 +460,11 @@ class Partition(val topic: String,
}.map(_.logEndOffset)
val newHighWatermark = allLogEndOffsets.min(new
LogOffsetMetadata.OffsetOrdering)
val oldHighWatermark = leaderReplica.highWatermark
- if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
oldHighWatermark.onOlderSegment(newHighWatermark)) {
+
+ // Ensure that the high watermark increases monotonically. We also update
the high watermark when the new
+ // offset metadata is on a newer segment, which occurs whenever the log is
rolled to a new segment.
+ if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
+ (oldHighWatermark.messageOffset == newHighWatermark.messageOffset &&
oldHighWatermark.onOlderSegment(newHighWatermark))) {
leaderReplica.highWatermark = newHighWatermark
debug(s"High watermark updated to $newHighWatermark")
true
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala
b/core/src/main/scala/kafka/cluster/Replica.scala
index e41e389..030e5b7 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -138,6 +138,9 @@ class Replica(val brokerId: Int,
def highWatermark_=(newHighWatermark: LogOffsetMetadata) {
if (isLocal) {
+ if (newHighWatermark.messageOffset < 0)
+ throw new IllegalArgumentException("High watermark offset should be
non-negative")
+
highWatermarkMetadata = newHighWatermark
log.foreach(_.onHighWatermarkIncremented(newHighWatermark.messageOffset))
trace(s"Setting high watermark for replica $brokerId partition
$topicPartition to [$newHighWatermark]")
@@ -165,9 +168,16 @@ class Replica(val brokerId: Int,
s"non-local replica $brokerId"))
}
- def convertHWToLocalOffsetMetadata() = {
+ /*
+ * Convert hw to local offset metadata by reading the log at the hw offset.
+ * If the hw offset is out of range, return the first offset of the first
log segment as the offset metadata.
+ */
+ def convertHWToLocalOffsetMetadata() {
if (isLocal) {
- highWatermarkMetadata =
log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset)
+ highWatermarkMetadata =
log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset).getOrElse {
+ val firstOffset = log.get.logSegments.head.baseOffset
+ new LogOffsetMetadata(firstOffset, firstOffset, 0)
+ }
} else {
throw new KafkaException(s"Should not construct complete high watermark
on partition $topicPartition's non-local replica $brokerId")
}
diff --git a/core/src/main/scala/kafka/log/Log.scala
b/core/src/main/scala/kafka/log/Log.scala
index 257dd8f..f0050f5 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1126,14 +1126,14 @@ class Log(@volatile var dir: File,
/**
* Given a message offset, find its corresponding offset metadata in the log.
- * If the message offset is out of range, return unknown offset metadata
+ * If the message offset is out of range, return None to the caller.
*/
- def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = {
+ def convertToOffsetMetadata(offset: Long): Option[LogOffsetMetadata] = {
try {
val fetchDataInfo = readUncommitted(offset, 1)
- fetchDataInfo.fetchOffsetMetadata
+ Some(fetchDataInfo.fetchOffsetMetadata)
} catch {
- case _: OffsetOutOfRangeException =>
LogOffsetMetadata.UnknownOffsetMetadata
+ case _: OffsetOutOfRangeException => None
}
}
diff --git
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 2a24a37..0c41519 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -77,6 +77,32 @@ class ReassignPartitionsClusterTest extends
ZooKeeperTestHarness with Logging {
}
@Test
+ def testHwAfterPartitionReassignment(): Unit = {
+ //Given a single replica on server 100
+ startBrokers(Seq(100, 101, 102))
+ adminClient = createAdminClient(servers)
+ createTopic(zkClient, topicName, Map(0 -> Seq(100)), servers = servers)
+
+ val topicPartition = new TopicPartition(topicName, 0)
+ val leaderServer = servers.find(_.config.brokerId == 100).get
+
leaderServer.replicaManager.logManager.truncateFullyAndStartAt(topicPartition,
100L, false)
+
+ val topicJson: String =
s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101,
102]}]}"""
+ ReassignPartitionsCommand.executeAssignment(zkClient, Some(adminClient),
topicJson, NoThrottle)
+
+ val newLeaderServer = servers.find(_.config.brokerId == 101).get
+
+ TestUtils.waitUntilTrue (
+ () =>
newLeaderServer.replicaManager.getPartition(topicPartition).flatMap(_.leaderReplicaIfLocal).isDefined,
+ "broker 101 should be the new leader", pause = 1L
+ )
+
+ assertEquals(100,
newLeaderServer.replicaManager.getReplicaOrException(topicPartition).highWatermark.messageOffset)
+ servers.foreach(server => waitUntilTrue(() =>
server.replicaManager.getReplicaOrException(topicPartition).highWatermark.messageOffset
== 100, ""))
+ }
+
+
+ @Test
def shouldMoveSinglePartition(): Unit = {
//Given a single replica on server 100
startBrokers(Seq(100, 101))
--
To stop receiving notification emails like this one, please contact
[email protected].