This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 882b0ec  KAFKA-3978; Ensure high watermark is always positive (#4695)
882b0ec is described below

commit 882b0ec98d12352442aaf84cb3199f4eb32f59fc
Author: Dong Lin <lindon...@users.noreply.github.com>
AuthorDate: Tue Mar 13 22:52:59 2018 -0700

    KAFKA-3978; Ensure high watermark is always positive (#4695)
    
    Partition high watermark may become -1 if the initial value is out of 
range. This situation can occur during partition reassignment, for example. The 
bug was fixed and validated with unit test in this patch.
    
    Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson 
<ja...@confluent.io>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  6 ++++-
 core/src/main/scala/kafka/cluster/Replica.scala    | 14 +++++++++--
 core/src/main/scala/kafka/log/Log.scala            |  8 +++---
 .../admin/ReassignPartitionsClusterTest.scala      | 29 ++++++++++++++++++++--
 4 files changed, 48 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 1e3ab75..e038b58 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -397,7 +397,11 @@ class Partition(val topic: String,
     }.map(_.logEndOffset)
     val newHighWatermark = allLogEndOffsets.min(new 
LogOffsetMetadata.OffsetOrdering)
     val oldHighWatermark = leaderReplica.highWatermark
-    if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || 
oldHighWatermark.onOlderSegment(newHighWatermark)) {
+
+    // Ensure that the high watermark increases monotonically. We also update 
the high watermark when the new
+    // offset metadata is on a newer segment, which occurs whenever the log is 
rolled to a new segment.
+    if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
+      (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && 
oldHighWatermark.onOlderSegment(newHighWatermark))) {
       leaderReplica.highWatermark = newHighWatermark
       debug(s"High watermark updated to $newHighWatermark")
       true
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala 
b/core/src/main/scala/kafka/cluster/Replica.scala
index 6f32a59..979bc30 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -138,6 +138,9 @@ class Replica(val brokerId: Int,
 
   def highWatermark_=(newHighWatermark: LogOffsetMetadata) {
     if (isLocal) {
+      if (newHighWatermark.messageOffset < 0)
+        throw new IllegalArgumentException("High watermark offset should be 
non-negative")
+
       highWatermarkMetadata = newHighWatermark
       log.foreach(_.onHighWatermarkIncremented(newHighWatermark.messageOffset))
       trace(s"Setting high watermark for replica $brokerId partition 
$topicPartition to [$newHighWatermark]")
@@ -165,9 +168,16 @@ class Replica(val brokerId: Int,
       s"non-local replica $brokerId"))
   }
 
-  def convertHWToLocalOffsetMetadata() = {
+  /*
+   * Convert hw to local offset metadata by reading the log at the hw offset.
+   * If the hw offset is out of range, return the first offset of the first 
log segment as the offset metadata.
+   */
+  def convertHWToLocalOffsetMetadata() {
     if (isLocal) {
-      highWatermarkMetadata = 
log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset)
+      highWatermarkMetadata = 
log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset).getOrElse {
+        val firstOffset = log.get.logSegments.head.baseOffset
+        new LogOffsetMetadata(firstOffset, firstOffset, 0)
+      }
     } else {
       throw new KafkaException(s"Should not construct complete high watermark 
on partition $topicPartition's non-local replica $brokerId")
     }
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 9ea3e3e..eb455ef 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1111,14 +1111,14 @@ class Log(@volatile var dir: File,
 
   /**
    * Given a message offset, find its corresponding offset metadata in the log.
-   * If the message offset is out of range, return unknown offset metadata
+   * If the message offset is out of range, return None to the caller.
    */
-  def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = {
+  def convertToOffsetMetadata(offset: Long): Option[LogOffsetMetadata] = {
     try {
       val fetchDataInfo = readUncommitted(offset, 1)
-      fetchDataInfo.fetchOffsetMetadata
+      Some(fetchDataInfo.fetchOffsetMetadata)
     } catch {
-      case _: OffsetOutOfRangeException => 
LogOffsetMetadata.UnknownOffsetMetadata
+      case _: OffsetOutOfRangeException => None
     }
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala 
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 7002e84..19a3a2f 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -27,13 +27,12 @@ import org.junit.{After, Before, Test}
 import kafka.admin.ReplicationQuotaUtils._
 import org.apache.kafka.clients.admin.AdminClientConfig
 import org.apache.kafka.clients.admin.{AdminClient => JAdminClient}
-import org.apache.kafka.common.TopicPartitionReplica
+import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica}
 
 import scala.collection.JavaConverters._
 import scala.collection.Map
 import scala.collection.Seq
 import scala.util.Random
-
 import java.io.File
 
 class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
@@ -79,6 +78,32 @@ class ReassignPartitionsClusterTest extends 
ZooKeeperTestHarness with Logging {
   }
 
   @Test
+  def testHwAfterPartitionReassignment(): Unit = {
+    //Given a single replica on server 100
+    startBrokers(Seq(100, 101, 102))
+    adminClient = createAdminClient(servers)
+    createTopic(zkUtils, topicName, Map(0 -> Seq(100)), servers = servers)
+
+    val topicPartition = new TopicPartition(topicName, 0)
+    val leaderServer = servers.find(_.config.brokerId == 100).get
+    
leaderServer.replicaManager.logManager.truncateFullyAndStartAt(topicPartition, 
100L)
+
+    val topicJson: String = 
s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101,
 102]}]}"""
+    ReassignPartitionsCommand.executeAssignment(zkUtils, Some(adminClient), 
topicJson, NoThrottle)
+
+    val newLeaderServer = servers.find(_.config.brokerId == 101).get
+
+    TestUtils.waitUntilTrue (
+      () => 
newLeaderServer.replicaManager.getPartition(topicPartition).flatMap(_.leaderReplicaIfLocal).isDefined,
+      "broker 101 should be the new leader", pause = 1L
+    )
+
+    assertEquals(100, 
newLeaderServer.replicaManager.getReplicaOrException(topicPartition).highWatermark.messageOffset)
+    servers.foreach(server => waitUntilTrue(() => 
server.replicaManager.getReplicaOrException(topicPartition).highWatermark.messageOffset
 == 100, ""))
+  }
+
+
+  @Test
   def shouldMoveSinglePartition(): Unit = {
     //Given a single replica on server 100
     startBrokers(Seq(100, 101))

-- 
To stop receiving notification emails like this one, please contact
j...@apache.org.

Reply via email to