This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new 0cc4479dc53 KAFKA-20634: Spurious HighWatermarkUpdate failed errors in
the group coordinator after partition leadership change (#22444)
0cc4479dc53 is described below
commit 0cc4479dc536cc275c4e0e771f5f38b730aa6f6f
Author: David Jacot <[email protected]>
AuthorDate: Wed Jun 3 17:40:52 2026 +0200
KAFKA-20634: Spurious HighWatermarkUpdate failed errors in the group
coordinator after partition leadership change (#22444)
When a `__consumer_offsets` partition transitions to follower, its local
log is truncated and re-replicated from the new leader. The group
coordinator hosting the partition remains active until it is unloaded
asynchronously. During that window, the partition's high watermark
advances again over records that this coordinator did not write, while
the coordinator still holds in-memory state (and pending deferred
operations) for its own records that were truncated and never durably
committed.
Applying such a high watermark has two consequences. It can violate the
invariants of the snapshot registry and fail the `HighWatermarkUpdate`
event, logging a spurious error such as "Execution of
HighWatermarkUpdate failed due to New committed offset X of
__consumer_offsets-N must be less than or equal to Y". More importantly,
when it does not fail, it advances the committed offset over the
coordinator's uncommitted state and completes the corresponding deferred
writes with a success response, even though those records were lost. A
client can therefore receive a successful offset-commit acknowledgment
for a commit that is silently dropped once the new coordinator takes
over.
This patch gates high watermark propagation in
`CoordinatorPartitionWriter.ListenerAdapter` on the partition's
leadership. The adapter stops forwarding high watermark updates once the
partition transitions to follower, is deleted, or fails. The partition
signals these transitions (via `PartitionListener`) before its fetcher
is restarted (see `ReplicaManager#applyDelta`), i.e. before any such
high watermark can be produced, so the coordinator never observes a high
watermark that it should not apply. The pending deferred operations then
remain in place and are failed with `NOT_COORDINATOR` when the
coordinator is unloaded, so clients correctly retry against the new
coordinator.
Gating on leadership rather than inspecting the offset is deliberate:
after truncation an offset can still have a snapshot in the registry
while holding the new leader's data, so no offset-based check can tell
whether a high watermark is safe to apply.
Reviewers: Sean Quah <[email protected]>
---
.../common/runtime/PartitionWriter.java | 29 +++++++++++-
.../group/CoordinatorPartitionWriter.scala | 28 +++++++++++-
.../group/CoordinatorPartitionWriterTest.scala | 51 ++++++++++++++++++++++
.../unit/kafka/server/ReplicaManagerTest.scala | 47 ++++++++++++++++++++
4 files changed, 152 insertions(+), 3 deletions(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
index cc76cfd6460..f0d8d7790c2 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
@@ -32,9 +32,29 @@ public interface PartitionWriter {
/**
* Listener allowing to listen to high watermark changes. This is meant
- * to be used in conjunction with {{@link
PartitionWriter#append(TopicPartition, VerificationGuard, MemoryRecords)}}.
+ * to be used in conjunction with {@link
PartitionWriter#append(TopicPartition, VerificationGuard, MemoryRecords)}.
+ * <p>
+ * A registered listener observes a single leadership tenure of the
partition. It is
+ * delivered high watermark updates only while this broker is the leader
of the
+ * partition. Once the partition is no longer led by this broker (it
transitions to
+ * follower, is deleted, or fails), the listener is permanently retired:
no further
+ * updates are delivered to it, even if the broker later regains
leadership. A new
+ * listener must be registered to observe a subsequent tenure.
+ * <p>
+ * Retiring the listener is required because, after a leadership change,
the local log
+ * can be truncated and re-replicated from the new leader, so a high
watermark observed
+ * afterwards may advance over records that this broker never wrote. This
guarantees
+ * that every delivered update advances only over records that this broker
wrote as
+ * leader.
*/
interface Listener {
+ /**
+ * Called when the high watermark of the partition advances. Only
invoked while
+ * this broker is the leader of the partition (see {@link Listener}).
+ *
+ * @param tp The topic partition.
+ * @param offset The new high watermark.
+ */
void onHighWatermarkUpdated(
TopicPartition tp,
long offset
@@ -42,7 +62,12 @@ public interface PartitionWriter {
}
/**
- * Register a {{@link Listener}}.
+ * Register a {@link Listener}.
+ * <p>
+ * The listener observes only the current leadership tenure: as described
on
+ * {@link Listener}, it stops receiving updates once the partition is no
longer led by
+ * this broker and is not re-armed if leadership is regained. A new
listener must be
+ * registered to observe a later tenure.
*
* @param tp The partition to register the listener to.
* @param listener The listener.
diff --git
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
index dbbdbb09868..f4ca4d84b42 100644
---
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
+++
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
@@ -28,20 +28,46 @@ import
org.apache.kafka.server.transaction.AddPartitionsToTxnManager
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
VerificationGuard}
import java.util.concurrent.CompletableFuture
+import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.Map
/**
* ListenerAdapter adapts the PartitionListener interface to the
* PartitionWriter.Listener interface.
+ *
+ * This upholds the PartitionWriter.Listener contract that high watermark
updates are
+ * delivered only while the partition is led by this broker. When the partition
+ * transitions to follower, is deleted or fails, its local log can be
truncated and
+ * re-replicated from the new leader, so the high watermark would advance over
records
+ * that this broker did not write; propagating those updates would corrupt the
+ * coordinator's committed state. The partition notifies these transitions
before its
+ * fetcher is restarted (see ReplicaManager#applyDelta), i.e. before any such
update can
+ * be produced, so gating on this flag is sufficient to stop them.
*/
private[group] class ListenerAdapter(
val listener: PartitionWriter.Listener
) extends PartitionListener {
+ private val active = new AtomicBoolean(true)
+
override def onHighWatermarkUpdated(
tp: TopicPartition,
offset: Long
): Unit = {
- listener.onHighWatermarkUpdated(tp, offset)
+ if (active.get()) {
+ listener.onHighWatermarkUpdated(tp, offset)
+ }
+ }
+
+ override def onBecomingFollower(tp: TopicPartition): Unit = {
+ active.set(false)
+ }
+
+ override def onFailed(tp: TopicPartition): Unit = {
+ active.set(false)
+ }
+
+ override def onDeleted(tp: TopicPartition): Unit = {
+ active.set(false)
}
override def equals(that: Any): Boolean = that match {
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
index 262bc3f85b6..7a197c405c2 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
@@ -34,6 +34,7 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.Mockito.{mock, verify, when}
import java.nio.charset.Charset
+import java.util
import java.util.Optional
import scala.collection.Map
import scala.jdk.CollectionConverters._
@@ -67,6 +68,56 @@ class CoordinatorPartitionWriterTest {
)
}
+ @Test
+ def testListenerAdapterPropagatesHighWatermarkUpdates(): Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val updates = new util.ArrayList[Long]()
+ val adapter = new ListenerAdapter(new PartitionWriter.Listener {
+ override def onHighWatermarkUpdated(tp: TopicPartition, offset: Long):
Unit = updates.add(offset)
+ })
+
+ adapter.onHighWatermarkUpdated(tp, 10L)
+ adapter.onHighWatermarkUpdated(tp, 20L)
+
+ assertEquals(util.List.of(10L, 20L), updates)
+ }
+
+ @Test
+ def testListenerAdapterStopsPropagatingAfterBecomingFollower(): Unit = {
+ assertHighWatermarkPropagationStops(_.onBecomingFollower(_))
+ }
+
+ @Test
+ def testListenerAdapterStopsPropagatingAfterFailed(): Unit = {
+ assertHighWatermarkPropagationStops(_.onFailed(_))
+ }
+
+ @Test
+ def testListenerAdapterStopsPropagatingAfterDeleted(): Unit = {
+ assertHighWatermarkPropagationStops(_.onDeleted(_))
+ }
+
+ /**
+ * Verifies that no high watermark update is propagated to the wrapped
listener
+ * once the given transition has been signalled. Such updates are not safe to
+ * apply because the partition is no longer led by this broker.
+ */
+ private def assertHighWatermarkPropagationStops(
+ transition: (ListenerAdapter, TopicPartition) => Unit
+ ): Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val updates = new util.ArrayList[Long]()
+ val adapter = new ListenerAdapter(new PartitionWriter.Listener {
+ override def onHighWatermarkUpdated(tp: TopicPartition, offset: Long):
Unit = updates.add(offset)
+ })
+
+ adapter.onHighWatermarkUpdated(tp, 10L)
+ transition(adapter, tp)
+ adapter.onHighWatermarkUpdated(tp, 20L)
+
+ assertEquals(util.List.of(10L), updates)
+ }
+
@Test
def testConfig(): Unit = {
val tp = new TopicPartition("foo", 0)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index f90b7fa9e55..d5664df10da 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -5738,6 +5738,53 @@ class ReplicaManagerTest {
}
}
+ @Test
+ def testPartitionListenerWhenPartitionBecomesFollower(): Unit = {
+ val aliveBrokersIds = Seq(0, 1)
+ val leaderEpoch = 5
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time),
+ brokerId = 0, aliveBrokersIds)
+ try {
+ val tp = new TopicPartition(topic, 0)
+ val replicas = aliveBrokersIds.toList.map(Int.box).asJava
+
+ val listener = new MockPartitionListener
+ listener.verify()
+
+ // Broker 0 becomes leader of the partition.
+ val leaderDelta = createLeaderDelta(
+ topicId = topicId,
+ partition = tp,
+ leaderId = 0,
+ replicas = replicas,
+ isr = replicas,
+ leaderEpoch = leaderEpoch
+ )
+ replicaManager.applyDelta(leaderDelta,
imageFromTopics(leaderDelta.apply()))
+
+ // Register a listener.
+ assertTrue(replicaManager.maybeAddListener(tp, listener))
+ listener.verify()
+
+ // Broker 0 transitions to follower of the partition with broker 1 as
the new
+ // leader. The listener is notified that the partition is becoming a
follower.
+ // This happens before the follower starts fetching from the new leader,
hence
+ // before any high watermark update reflecting the new leader's records.
+ val followerDelta = createFollowerDelta(
+ topicId = topicId,
+ partition = tp,
+ followerId = 0,
+ leaderId = 1,
+ leaderEpoch = leaderEpoch + 1
+ )
+ replicaManager.applyDelta(followerDelta,
imageFromTopics(followerDelta.apply()))
+
+ listener.verify(expectedFollower = true)
+ } finally {
+ replicaManager.shutdown(checkpointHW = false)
+ }
+ }
+
private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean,
partitions:List[Int] = List(0), directoryIds: List[Uuid] = List.empty,
topicName: String = "foo", topicId: Uuid = FOO_UUID): TopicsDelta = {
val leader = if (isStartIdLeader) startId else startId + 1
val delta = new TopicsDelta(TopicsImage.EMPTY)