This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new 0160a3ca798 KAFKA-20634: Spurious HighWatermarkUpdate failed errors in 
the group coordinator after partition leadership change (#22444)
0160a3ca798 is described below

commit 0160a3ca79898e5eaec490277d70fe6a84afe2a7
Author: David Jacot <[email protected]>
AuthorDate: Wed Jun 3 17:40:52 2026 +0200

    KAFKA-20634: Spurious HighWatermarkUpdate failed errors in the group 
coordinator after partition leadership change (#22444)
    
    When a `__consumer_offsets` partition transitions to follower, its local
    log is truncated and re-replicated from the new leader. The group
    coordinator hosting the partition remains active until it is unloaded
    asynchronously. During that window, the partition's high watermark
    advances again over records that this coordinator did not write, while
    the coordinator still holds in-memory state (and pending deferred
    operations) for its own records that were truncated and never durably
    committed.
    
    Applying such a high watermark has two consequences. It can violate the
    invariants of the snapshot registry and fail the `HighWatermarkUpdate`
    event, logging a spurious error such as "Execution of
    HighWatermarkUpdate failed due to New committed offset X of
    __consumer_offsets-N must be less than or equal to Y". More importantly,
    when it does not fail, it advances the committed offset over the
    coordinator's uncommitted state and completes the corresponding deferred
    writes with a success response, even though those records were lost. A
    client can therefore receive a successful offset-commit acknowledgment
    for a commit that is silently dropped once the new coordinator takes
    over.
    
    This patch gates high watermark propagation in
    `CoordinatorPartitionWriter.ListenerAdapter` on the partition's
    leadership. The adapter stops forwarding high watermark updates once the
    partition transitions to follower, is deleted, or fails. The partition
    signals these transitions (via `PartitionListener`) before its fetcher
    is restarted (see `ReplicaManager#applyDelta`), i.e. before any such
    high watermark can be produced, so the coordinator never observes a high
    watermark that it should not apply. The pending deferred operations then
    remain in place and are failed with `NOT_COORDINATOR` when the
    coordinator is unloaded, so clients correctly retry against the new
    coordinator.
    
    Gating on leadership rather than inspecting the offset is deliberate:
    after truncation an offset can still have a snapshot in the registry
    while holding the new leader's data, so no offset-based check can tell
    whether a high watermark is safe to apply.
    
    Reviewers: Sean Quah <[email protected]>
---
 .../common/runtime/PartitionWriter.java            | 29 ++++++++++++-
 .../group/CoordinatorPartitionWriter.scala         | 28 +++++++++++-
 .../group/CoordinatorPartitionWriterTest.scala     | 50 ++++++++++++++++++++++
 .../unit/kafka/server/ReplicaManagerTest.scala     | 47 ++++++++++++++++++++
 4 files changed, 151 insertions(+), 3 deletions(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
index 2d974867c63..54726fd5b29 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
@@ -32,9 +32,29 @@ public interface PartitionWriter {
 
     /**
      * Listener allowing to listen to high watermark changes. This is meant
-     * to be used in conjunction with {{@link 
PartitionWriter#append(TopicPartition, VerificationGuard, MemoryRecords)}}.
+     * to be used in conjunction with {@link 
PartitionWriter#append(TopicPartition, VerificationGuard, MemoryRecords, 
short)}.
+     * <p>
+     * A registered listener observes a single leadership tenure of the 
partition. It is
+     * delivered high watermark updates only while this broker is the leader 
of the
+     * partition. Once the partition is no longer led by this broker (it 
transitions to
+     * follower, is deleted, or fails), the listener is permanently retired: 
no further
+     * updates are delivered to it, even if the broker later regains 
leadership. A new
+     * listener must be registered to observe a subsequent tenure.
+     * <p>
+     * Retiring the listener is required because, after a leadership change, 
the local log
+     * can be truncated and re-replicated from the new leader, so a high 
watermark observed
+     * afterwards may advance over records that this broker never wrote. This 
guarantees
+     * that every delivered update advances only over records that this broker 
wrote as
+     * leader.
      */
     interface Listener {
+        /**
+         * Called when the high watermark of the partition advances. Only 
invoked while
+         * this broker is the leader of the partition (see {@link Listener}).
+         *
+         * @param tp        The topic partition.
+         * @param offset    The new high watermark.
+         */
         void onHighWatermarkUpdated(
             TopicPartition tp,
             long offset
@@ -42,7 +62,12 @@ public interface PartitionWriter {
     }
 
     /**
-     * Register a {{@link Listener}}.
+     * Register a {@link Listener}.
+     * <p>
+     * The listener observes only the current leadership tenure: as described 
on
+     * {@link Listener}, it stops receiving updates once the partition is no 
longer led by
+     * this broker and is not re-armed if leadership is regained. A new 
listener must be
+     * registered to observe a later tenure.
      *
      * @param tp        The partition to register the listener to.
      * @param listener  The listener.
diff --git 
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala 
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
index 32937821527..ad6b0158a02 100644
--- 
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
+++ 
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
@@ -28,20 +28,46 @@ import 
org.apache.kafka.server.transaction.AddPartitionsToTxnManager
 import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, 
VerificationGuard}
 
 import java.util.concurrent.CompletableFuture
+import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.Map
 
 /**
  * ListenerAdapter adapts the PartitionListener interface to the
  * PartitionWriter.Listener interface.
+ *
+ * This upholds the PartitionWriter.Listener contract that high watermark 
updates are
+ * delivered only while the partition is led by this broker. When the partition
+ * transitions to follower, is deleted or fails, its local log can be 
truncated and
+ * re-replicated from the new leader, so the high watermark would advance over 
records
+ * that this broker did not write; propagating those updates would corrupt the
+ * coordinator's committed state. The partition notifies these transitions 
before its
+ * fetcher is restarted (see ReplicaManager#applyDelta), i.e. before any such 
update can
+ * be produced, so gating on this flag is sufficient to stop them.
  */
 private[group] class ListenerAdapter(
   val listener: PartitionWriter.Listener
 ) extends PartitionListener {
+  private val active = new AtomicBoolean(true)
+
   override def onHighWatermarkUpdated(
     tp: TopicPartition,
     offset: Long
   ): Unit = {
-    listener.onHighWatermarkUpdated(tp, offset)
+    if (active.get()) {
+      listener.onHighWatermarkUpdated(tp, offset)
+    }
+  }
+
+  override def onBecomingFollower(tp: TopicPartition): Unit = {
+    active.set(false)
+  }
+
+  override def onFailed(tp: TopicPartition): Unit = {
+    active.set(false)
+  }
+
+  override def onDeleted(tp: TopicPartition): Unit = {
+    active.set(false)
   }
 
   override def equals(that: Any): Boolean = that match {
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
index 57fab6b6865..6d6bcd4154e 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
@@ -68,6 +68,56 @@ class CoordinatorPartitionWriterTest {
     )
   }
 
+  @Test
+  def testListenerAdapterPropagatesHighWatermarkUpdates(): Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val updates = new util.ArrayList[Long]()
+    val adapter = new ListenerAdapter(new PartitionWriter.Listener {
+      override def onHighWatermarkUpdated(tp: TopicPartition, offset: Long): 
Unit = updates.add(offset)
+    })
+
+    adapter.onHighWatermarkUpdated(tp, 10L)
+    adapter.onHighWatermarkUpdated(tp, 20L)
+
+    assertEquals(util.List.of(10L, 20L), updates)
+  }
+
+  @Test
+  def testListenerAdapterStopsPropagatingAfterBecomingFollower(): Unit = {
+    assertHighWatermarkPropagationStops(_.onBecomingFollower(_))
+  }
+
+  @Test
+  def testListenerAdapterStopsPropagatingAfterFailed(): Unit = {
+    assertHighWatermarkPropagationStops(_.onFailed(_))
+  }
+
+  @Test
+  def testListenerAdapterStopsPropagatingAfterDeleted(): Unit = {
+    assertHighWatermarkPropagationStops(_.onDeleted(_))
+  }
+
+  /**
+   * Verifies that no high watermark update is propagated to the wrapped 
listener
+   * once the given transition has been signalled. Such updates are not safe to
+   * apply because the partition is no longer led by this broker.
+   */
+  private def assertHighWatermarkPropagationStops(
+    transition: (ListenerAdapter, TopicPartition) => Unit
+  ): Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val updates = new util.ArrayList[Long]()
+    val adapter = new ListenerAdapter(new PartitionWriter.Listener {
+      override def onHighWatermarkUpdated(tp: TopicPartition, offset: Long): 
Unit = updates.add(offset)
+    })
+
+    adapter.onHighWatermarkUpdated(tp, 10L)
+    transition(adapter, tp)
+    adapter.onHighWatermarkUpdated(tp, 20L)
+
+    assertEquals(util.List.of(10L), updates)
+  }
+
   @Test
   def testConfig(): Unit = {
     val tp = new TopicPartition("foo", 0)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 753b831b594..2079c9ba873 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -5409,6 +5409,53 @@ class ReplicaManagerTest {
     }
   }
 
+  @Test
+  def testPartitionListenerWhenPartitionBecomesFollower(): Unit = {
+    val aliveBrokersIds = Seq(0, 1)
+    val leaderEpoch = 5
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
+      brokerId = 0, aliveBrokersIds)
+    try {
+      val tp = new TopicPartition(topic, 0)
+      val replicas = aliveBrokersIds.toList.map(Int.box).asJava
+
+      val listener = new MockPartitionListener
+      listener.verify()
+
+      // Broker 0 becomes leader of the partition.
+      val leaderDelta = createLeaderDelta(
+        topicId = topicId,
+        partition = tp,
+        leaderId = 0,
+        replicas = replicas,
+        isr = replicas,
+        leaderEpoch = leaderEpoch
+      )
+      replicaManager.applyDelta(leaderDelta, 
imageFromTopics(leaderDelta.apply()))
+
+      // Register a listener.
+      assertTrue(replicaManager.maybeAddListener(tp, listener))
+      listener.verify()
+
+      // Broker 0 transitions to follower of the partition with broker 1 as 
the new
+      // leader. The listener is notified that the partition is becoming a 
follower.
+      // This happens before the follower starts fetching from the new leader, 
hence
+      // before any high watermark update reflecting the new leader's records.
+      val followerDelta = createFollowerDelta(
+        topicId = topicId,
+        partition = tp,
+        followerId = 0,
+        leaderId = 1,
+        leaderEpoch = leaderEpoch + 1
+      )
+      replicaManager.applyDelta(followerDelta, 
imageFromTopics(followerDelta.apply()))
+
+      listener.verify(expectedFollower = true)
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
   private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean, 
partitions:List[Int] = List(0), directoryIds: List[Uuid] = List.empty, 
topicName: String = "foo", topicId: Uuid = FOO_UUID, leaderEpoch: Int = 0): 
TopicsDelta = {
     val leader = if (isStartIdLeader) startId else startId + 1
     val delta = new TopicsDelta(TopicsImage.EMPTY)

Reply via email to