This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new 0160a3ca798 KAFKA-20634: Spurious HighWatermarkUpdate failed errors in
the group coordinator after partition leadership change (#22444)
0160a3ca798 is described below
commit 0160a3ca79898e5eaec490277d70fe6a84afe2a7
Author: David Jacot <[email protected]>
AuthorDate: Wed Jun 3 17:40:52 2026 +0200
KAFKA-20634: Spurious HighWatermarkUpdate failed errors in the group
coordinator after partition leadership change (#22444)
When a `__consumer_offsets` partition transitions to follower, its local
log is truncated and re-replicated from the new leader. The group
coordinator hosting the partition remains active until it is unloaded
asynchronously. During that window, the partition's high watermark
advances again over records that this coordinator did not write, while
the coordinator still holds in-memory state (and pending deferred
operations) for its own records that were truncated and never durably
committed.
Applying such a high watermark has two consequences. It can violate the
invariants of the snapshot registry and fail the `HighWatermarkUpdate`
event, logging a spurious error such as "Execution of
HighWatermarkUpdate failed due to New committed offset X of
__consumer_offsets-N must be less than or equal to Y". More importantly,
when it does not fail, it advances the committed offset over the
coordinator's uncommitted state and completes the corresponding deferred
writes with a success response, even though those records were lost. A
client can therefore receive a successful offset-commit acknowledgment
for a commit that is silently dropped once the new coordinator takes
over.
This patch gates high watermark propagation in
`CoordinatorPartitionWriter.ListenerAdapter` on the partition's
leadership. The adapter stops forwarding high watermark updates once the
partition transitions to follower, is deleted, or fails. The partition
signals these transitions (via `PartitionListener`) before its fetcher
is restarted (see `ReplicaManager#applyDelta`), i.e. before any such
high watermark can be produced, so the coordinator never observes a high
watermark that it should not apply. The pending deferred operations then
remain in place and are failed with `NOT_COORDINATOR` when the
coordinator is unloaded, so clients correctly retry against the new
coordinator.
Gating on leadership rather than inspecting the offset is deliberate:
after truncation an offset can still have a snapshot in the registry
while holding the new leader's data, so no offset-based check can tell
whether a high watermark is safe to apply.
Reviewers: Sean Quah <[email protected]>
---
.../common/runtime/PartitionWriter.java | 29 ++++++++++++-
.../group/CoordinatorPartitionWriter.scala | 28 +++++++++++-
.../group/CoordinatorPartitionWriterTest.scala | 50 ++++++++++++++++++++++
.../unit/kafka/server/ReplicaManagerTest.scala | 47 ++++++++++++++++++++
4 files changed, 151 insertions(+), 3 deletions(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
index 2d974867c63..54726fd5b29 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
@@ -32,9 +32,29 @@ public interface PartitionWriter {
/**
* Listener allowing to listen to high watermark changes. This is meant
- * to be used in conjunction with {{@link
PartitionWriter#append(TopicPartition, VerificationGuard, MemoryRecords)}}.
+ * to be used in conjunction with {@link
PartitionWriter#append(TopicPartition, VerificationGuard, MemoryRecords,
short)}.
+ * <p>
+ * A registered listener observes a single leadership tenure of the
partition. It is
+ * delivered high watermark updates only while this broker is the leader
of the
+ * partition. Once the partition is no longer led by this broker (it
transitions to
+ * follower, is deleted, or fails), the listener is permanently retired:
no further
+ * updates are delivered to it, even if the broker later regains
leadership. A new
+ * listener must be registered to observe a subsequent tenure.
+ * <p>
+ * Retiring the listener is required because, after a leadership change,
the local log
+ * can be truncated and re-replicated from the new leader, so a high
watermark observed
+ * afterwards may advance over records that this broker never wrote. This
guarantees
+ * that every delivered update advances only over records that this broker
wrote as
+ * leader.
*/
interface Listener {
+ /**
+ * Called when the high watermark of the partition advances. Only
invoked while
+ * this broker is the leader of the partition (see {@link Listener}).
+ *
+ * @param tp The topic partition.
+ * @param offset The new high watermark.
+ */
void onHighWatermarkUpdated(
TopicPartition tp,
long offset
@@ -42,7 +62,12 @@ public interface PartitionWriter {
}
/**
- * Register a {{@link Listener}}.
+ * Register a {@link Listener}.
+ * <p>
+ * The listener observes only the current leadership tenure: as described
on
+ * {@link Listener}, it stops receiving updates once the partition is no
longer led by
+ * this broker and is not re-armed if leadership is regained. A new
listener must be
+ * registered to observe a later tenure.
*
* @param tp The partition to register the listener to.
* @param listener The listener.
diff --git
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
index 32937821527..ad6b0158a02 100644
---
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
+++
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
@@ -28,20 +28,46 @@ import
org.apache.kafka.server.transaction.AddPartitionsToTxnManager
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
VerificationGuard}
import java.util.concurrent.CompletableFuture
+import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.Map
/**
* ListenerAdapter adapts the PartitionListener interface to the
* PartitionWriter.Listener interface.
+ *
+ * This upholds the PartitionWriter.Listener contract that high watermark
updates are
+ * delivered only while the partition is led by this broker. When the partition
+ * transitions to follower, is deleted or fails, its local log can be
truncated and
+ * re-replicated from the new leader, so the high watermark would advance over
records
+ * that this broker did not write; propagating those updates would corrupt the
+ * coordinator's committed state. The partition notifies these transitions
before its
+ * fetcher is restarted (see ReplicaManager#applyDelta), i.e. before any such
update can
+ * be produced, so gating on this flag is sufficient to stop them.
*/
private[group] class ListenerAdapter(
val listener: PartitionWriter.Listener
) extends PartitionListener {
+ private val active = new AtomicBoolean(true)
+
override def onHighWatermarkUpdated(
tp: TopicPartition,
offset: Long
): Unit = {
- listener.onHighWatermarkUpdated(tp, offset)
+ if (active.get()) {
+ listener.onHighWatermarkUpdated(tp, offset)
+ }
+ }
+
+ override def onBecomingFollower(tp: TopicPartition): Unit = {
+ active.set(false)
+ }
+
+ override def onFailed(tp: TopicPartition): Unit = {
+ active.set(false)
+ }
+
+ override def onDeleted(tp: TopicPartition): Unit = {
+ active.set(false)
}
override def equals(that: Any): Boolean = that match {
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
index 57fab6b6865..6d6bcd4154e 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
@@ -68,6 +68,56 @@ class CoordinatorPartitionWriterTest {
)
}
+ @Test
+ def testListenerAdapterPropagatesHighWatermarkUpdates(): Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val updates = new util.ArrayList[Long]()
+ val adapter = new ListenerAdapter(new PartitionWriter.Listener {
+ override def onHighWatermarkUpdated(tp: TopicPartition, offset: Long):
Unit = updates.add(offset)
+ })
+
+ adapter.onHighWatermarkUpdated(tp, 10L)
+ adapter.onHighWatermarkUpdated(tp, 20L)
+
+ assertEquals(util.List.of(10L, 20L), updates)
+ }
+
+ @Test
+ def testListenerAdapterStopsPropagatingAfterBecomingFollower(): Unit = {
+ assertHighWatermarkPropagationStops(_.onBecomingFollower(_))
+ }
+
+ @Test
+ def testListenerAdapterStopsPropagatingAfterFailed(): Unit = {
+ assertHighWatermarkPropagationStops(_.onFailed(_))
+ }
+
+ @Test
+ def testListenerAdapterStopsPropagatingAfterDeleted(): Unit = {
+ assertHighWatermarkPropagationStops(_.onDeleted(_))
+ }
+
+ /**
+ * Verifies that no high watermark update is propagated to the wrapped
listener
+ * once the given transition has been signalled. Such updates are not safe to
+ * apply because the partition is no longer led by this broker.
+ */
+ private def assertHighWatermarkPropagationStops(
+ transition: (ListenerAdapter, TopicPartition) => Unit
+ ): Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val updates = new util.ArrayList[Long]()
+ val adapter = new ListenerAdapter(new PartitionWriter.Listener {
+ override def onHighWatermarkUpdated(tp: TopicPartition, offset: Long):
Unit = updates.add(offset)
+ })
+
+ adapter.onHighWatermarkUpdated(tp, 10L)
+ transition(adapter, tp)
+ adapter.onHighWatermarkUpdated(tp, 20L)
+
+ assertEquals(util.List.of(10L), updates)
+ }
+
@Test
def testConfig(): Unit = {
val tp = new TopicPartition("foo", 0)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 753b831b594..2079c9ba873 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -5409,6 +5409,53 @@ class ReplicaManagerTest {
}
}
+ @Test
+ def testPartitionListenerWhenPartitionBecomesFollower(): Unit = {
+ val aliveBrokersIds = Seq(0, 1)
+ val leaderEpoch = 5
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time),
+ brokerId = 0, aliveBrokersIds)
+ try {
+ val tp = new TopicPartition(topic, 0)
+ val replicas = aliveBrokersIds.toList.map(Int.box).asJava
+
+ val listener = new MockPartitionListener
+ listener.verify()
+
+ // Broker 0 becomes leader of the partition.
+ val leaderDelta = createLeaderDelta(
+ topicId = topicId,
+ partition = tp,
+ leaderId = 0,
+ replicas = replicas,
+ isr = replicas,
+ leaderEpoch = leaderEpoch
+ )
+ replicaManager.applyDelta(leaderDelta,
imageFromTopics(leaderDelta.apply()))
+
+ // Register a listener.
+ assertTrue(replicaManager.maybeAddListener(tp, listener))
+ listener.verify()
+
+ // Broker 0 transitions to follower of the partition with broker 1 as
the new
+ // leader. The listener is notified that the partition is becoming a
follower.
+ // This happens before the follower starts fetching from the new leader,
hence
+ // before any high watermark update reflecting the new leader's records.
+ val followerDelta = createFollowerDelta(
+ topicId = topicId,
+ partition = tp,
+ followerId = 0,
+ leaderId = 1,
+ leaderEpoch = leaderEpoch + 1
+ )
+ replicaManager.applyDelta(followerDelta,
imageFromTopics(followerDelta.apply()))
+
+ listener.verify(expectedFollower = true)
+ } finally {
+ replicaManager.shutdown(checkpointHW = false)
+ }
+ }
+
private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean,
partitions:List[Int] = List(0), directoryIds: List[Uuid] = List.empty,
topicName: String = "foo", topicId: Uuid = FOO_UUID, leaderEpoch: Int = 0):
TopicsDelta = {
val leader = if (isStartIdLeader) startId else startId + 1
val delta = new TopicsDelta(TopicsImage.EMPTY)