RivenSun created KAFKA-19902:
--------------------------------
Summary: Consumer triggers OFFSET_OUT_OF_RANGE and resets to
earliest when committed offset's epoch has been deleted
Key: KAFKA-19902
URL: https://issues.apache.org/jira/browse/KAFKA-19902
Project: Kafka
Issue Type: Bug
Components: clients, consumer
Affects Versions: 3.9.0
Reporter: RivenSun
h2. Summary
When a partition leader changes and the consumer commits offsets during/after
the change, if the committed offset's epoch is subsequently deleted due to
retention policy, the consumer will encounter OFFSET_OUT_OF_RANGE error and
reset to earliest (if auto.offset.reset=earliest), causing massive message
reprocessing.The root cause is that SubscriptionState.allConsumed() uses
position.offsetEpoch instead of position.currentLeader.epoch when constructing
OffsetAndMetadata for commit, which can become stale when leader changes occur.
----
h2. Environment
Cluster Configuration:
* Kafka Server Version: 3.9.0
* Kafka Client Version: 3.9.0
* Topic: 200 partitions, 7-day retention, no tiered storage
* Consumer Group: 45 consumers (1 KafkaConsumer thread per machine)
* No broker/controller restarts occurred
* High throughput producer continuously writing messages
Consumer Configuration:
{code:java}
auto.offset.reset=earliest
enable.auto.commit=true {code}
Consumer Code:
* Registered ConsumerRebalanceListener
* Calls kafkaConsumer.commitSync() in onPartitionsRevoked() method
----
h2. Problem Description
In a scenario where the consumer group has no lag, consumers suddenly consumed
a massive amount of messages, far exceeding the recent few minutes of producer
writes. Investigation revealed that multiple partitions reset to the earliest
offset and reprocessed up to 7 days of historical data.
----
h2. Observed Symptoms (Timeline)
# Consumer group rebalance occurred (triggered by normal consumer group
management)
# Consumer logged OFFSET_OUT_OF_RANGE errors immediately after rebalance
# Consumer reset to earliest offset due to auto.offset.reset=earliest
configuration
# Producer logged NotLeaderOrFollowerException around the same timeframe,
indicating partition leader changes
# Consumer did not log any NOT_LEADER_OR_FOLLOWER errors (these are DEBUG
level and not visible in production logs)
----
h2. Root Cause Analysis
h3. The Problem Chain
1. Leader change occurs (epoch changes from N to N+1)
↓
2. Consumer continues processing old batches (epoch=N)
↓
3. Consumer commits offset during/after rebalance
├─ Committed offset: 1000
└─ Committed epoch: N (using position.offsetEpoch from old batch)
↓
4. High throughput + retention policy causes old segments (epoch=N) to be
deleted
↓
5. Consumer restarts/rebalances and fetches committed offset
├─ Tries to validate offset 1000 with epoch=N
└─ Broker cannot find epoch=N (segments deleted)
↓
6. Broker returns OFFSET_OUT_OF_RANGE
↓
7. Consumer resets to earliest offset
h3. Code Analysis
The problematic code in SubscriptionState.allConsumed():
{code:java}
//
kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
assignment.forEach((topicPartition, partitionState) -> {
if (partitionState.hasValidPosition())
allConsumed.put(topicPartition, new OffsetAndMetadata(
partitionState.position.offset,
partitionState.position.offsetEpoch, // ❌ Problem: uses
offsetEpoch from consumed batch
""));
});
return allConsumed;
} {code}
Why this is problematic:The FetchPosition class contains two different epoch
values:
* offsetEpoch: The epoch from the last consumed record's batch
* currentLeader.epoch: The current partition leader's epoch from metadata
When committing offsets, we should use currentLeader.epoch instead of
offsetEpoch because:
# offsetEpoch represents the epoch of already consumed data (historical)
# currentLeader.epoch represents the current partition leader (up-to-date)
h3. Scenarios Where These Epochs Diverge
Scenario A: Leader changes while consumer is processing old batches
* T1: Consumer fetches batch with epoch=5
* T2: Leader changes to epoch=6
* T3: Metadata updates with new leader epoch=6
* T4: Consumer commits offset
* offsetEpoch = 5 (from batch being processed)
* currentLeader.epoch = 6 (from updated metadata)
* Problem: Commits epoch=5, which may soon be deleted
Scenario B: Recovery from committed offset after leader change
* Consumer commits offset with old epoch=N
* Leader changes to epoch=N+1
* Old segments (epoch=N) are deleted by retention policy
* Consumer rebalances and tries to restore from committed offset
* offsetEpoch = N (from committed offset)
* currentLeader.epoch = N+1 (from current metadata)
* Problem: Validation fails because epoch=N no longer exists
----
h2. Steps to Reproduce
This is a timing-sensitive edge case. The following conditions increase the
likelihood:
# Setup:
* High-throughput topic (to trigger faster log rotation)
* Relatively short retention period (e.g., 7 days)
* Consumer group with rebalance listener calling commitSync()
* enable.auto.commit=true (or any manual commit)
# Trigger:
* Trigger a partition leader change (broker restart, controller election, etc.)
* Simultaneously or shortly after, trigger a consumer group rebalance
* Wait for retention policy to delete old log segments
# Expected Result:
Consumer should resume from committed offset
# Actual Result:
Consumer encounters OFFSET_OUT_OF_RANGE and resets to earliest
----
h2. Impact
* Data Reprocessing: Consumers may reprocess up to retention.ms worth of data
* Service Degradation: Sudden spike in consumer throughput can overwhelm
downstream systems
* Resource Waste: Unnecessary CPU, memory, and network usage
* Potential Duplicates: If using auto.offset.reset=earliest, duplicate message
processing is guaranteed
----
h2. Proposed Fix
Modify SubscriptionState.allConsumed() to {color:#de350b}*use
currentLeader.epoch instead of offsetEpoch*{color}:
{code:java}
public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
assignment.forEach((topicPartition, partitionState) -> {
if (partitionState.hasValidPosition())
allConsumed.put(topicPartition, new OffsetAndMetadata(
partitionState.position.offset,
partitionState.position.currentLeader.epoch, // ✅ Use current
leader epoch
""));
});
return allConsumed;
} {code}
Rationale:
* currentLeader.epoch represents the most recent known leader epoch from
metadata
* This ensures committed offsets reference epochs that are more likely to
still exist
* In the majority of cases (no leader change), both epochs are identical
* When they diverge, currentLeader.epoch is the safer choice for commit
----
h2. Additional Notes
Why consumers don't log NOT_LEADER_OR_FOLLOWER errors:All consumer-side
handling of NOT_LEADER_OR_FOLLOWER errors uses DEBUG level logging:
{code:java}
// FetchCollector.java line 325
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
// AbstractFetch.java line 207
log.debug("For {}, received error {}, with leaderIdAndEpoch {}", partition,
partitionError, ...);
// OffsetsForLeaderEpochUtils.java line 102
LOG.debug("Attempt to fetch offsets for partition {} failed due to {},
retrying.", ...); {code}
This makes the issue difficult to diagnose in production environments.
----
h2. Workarounds (Until Fixed)
# Increase retention period to reduce likelihood of epoch deletion
# Monitor consumer lag to ensure it stays low
# Reduce rebalance frequency (increase max.poll.interval.ms,
session.timeout.ms)
# Use cooperative rebalance strategy to minimize rebalance impact
# Consider using auto.offset.reset=latest if reprocessing is more costly than
data loss (application-dependent)
----
h2. Related Code References
h3. 1. The problematic method: SubscriptionState.allConsumed()
Location: org.apache.kafka.clients.consumer.internals.SubscriptionState
{code:java}
public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
assignment.forEach((topicPartition, partitionState) -> {
if (partitionState.hasValidPosition())
allConsumed.put(topicPartition, new
OffsetAndMetadata(partitionState.position.offset,
partitionState.position.offsetEpoch, "")); // ❌ Uses
offsetEpoch instead of currentLeader.epoch
});
return allConsumed;
} {code}
h3. 2. How FetchPosition is updated during normal consumption
Location: org.apache.kafka.clients.consumer.internals.FetchCollector
{code:java}
if (nextInLineFetch.nextFetchOffset() > position.offset) {
SubscriptionState.FetchPosition nextPosition = new
SubscriptionState.FetchPosition(
nextInLineFetch.nextFetchOffset(),
nextInLineFetch.lastEpoch(), // offsetEpoch: from consumed batch
position.currentLeader); // currentLeader: inherited from
old position, NOT updated!
log.trace("Updating fetch position from {} to {} for partition {} and
returning {} records from `poll()`",
position, nextPosition, tp, partRecords.size());
subscriptions.position(tp, nextPosition);
positionAdvanced = true;
} {code}
Key Issue: The currentLeader field is inherited from the previous position and
not automatically updated during normal consumption. It only gets updated when
leader change errors are detected.
h3. 3. How committed offsets are restored after rebalance
Location:
org.apache.kafka.clients.consumer.internals.ConsumerUtils.refreshCommittedOffsets()
{code:java}
public static void refreshCommittedOffsets(final Map<TopicPartition,
OffsetAndMetadata> offsetsAndMetadata,
final ConsumerMetadata metadata,
final SubscriptionState
subscriptions) {
for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry :
offsetsAndMetadata.entrySet()) {
final TopicPartition tp = entry.getKey();
final OffsetAndMetadata offsetAndMetadata = entry.getValue();
if (offsetAndMetadata != null) {
// first update the epoch if necessary
entry.getValue().leaderEpoch().ifPresent(epoch ->
metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));
// it's possible that the partition is no longer assigned when the
response is received,
// so we need to ignore seeking if that's the case
if (subscriptions.isAssigned(tp)) {
final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch =
metadata.currentLeader(tp);
final SubscriptionState.FetchPosition position = new
SubscriptionState.FetchPosition(
offsetAndMetadata.offset(),
offsetAndMetadata.leaderEpoch(), // offsetEpoch from
committed offset (may be old)
leaderAndEpoch); // currentLeader
from current metadata (may be new)
subscriptions.seekUnvalidated(tp, position);
log.info("Setting offset for partition {} to the committed
offset {}", tp, position);
}
}
}
} {code}
The Divergence Point: When restoring from committed offsets, offsetEpoch comes
from the stored offset (potentially old), while currentLeader comes from fresh
metadata (potentially new after leader change).
h3. 4. How OffsetsForLeaderEpoch validation request is constructed
Location:
org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochUtils.prepareRequest()
{code:java}
static AbstractRequest.Builder<OffsetsForLeaderEpochRequest> prepareRequest(
Map<TopicPartition, SubscriptionState.FetchPosition> requestData) {
OffsetForLeaderTopicCollection topics = new
OffsetForLeaderTopicCollection(requestData.size());
requestData.forEach((topicPartition, fetchPosition) ->
fetchPosition.offsetEpoch.ifPresent(fetchEpoch -> {
OffsetForLeaderTopic topic =
topics.find(topicPartition.topic());
if (topic == null) {
topic = new
OffsetForLeaderTopic().setTopic(topicPartition.topic());
topics.add(topic);
}
topic.partitions().add(new OffsetForLeaderPartition()
.setPartition(topicPartition.partition())
.setLeaderEpoch(fetchEpoch) // Uses
offsetEpoch for validation
.setCurrentLeaderEpoch(fetchPosition.currentLeader.epoch
.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
);
})
);
return OffsetsForLeaderEpochRequest.Builder.forConsumer(topics);
} {code}
The Validation Problem: The validation request uses fetchEpoch (which is
offsetEpoch) to validate against the broker. If this epoch no longer exists in
the broker's log, validation fails and triggers OFFSET_OUT_OF_RANGE.
h3. 5. FetchPosition class definition
Location:
org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition
{code:java}
/**
* Represents the position of a partition subscription.
*
* This includes the offset and epoch from the last record in
* the batch from a FetchResponse. It also includes the leader epoch at the
time the batch was consumed.
*/
public static class FetchPosition {
public final long offset;
final Optional<Integer> offsetEpoch; // Epoch from last consumed
record's batch
final Metadata.LeaderAndEpoch currentLeader; // Current partition leader
info from metadata
FetchPosition(long offset) {
this(offset, Optional.empty(),
Metadata.LeaderAndEpoch.noLeaderOrEpoch());
}
public FetchPosition(long offset, Optional<Integer> offsetEpoch,
Metadata.LeaderAndEpoch currentLeader) {
this.offset = offset;
this.offsetEpoch = Objects.requireNonNull(offsetEpoch);
this.currentLeader = Objects.requireNonNull(currentLeader);
}
@Override
public String toString() {
return "FetchPosition{" +
"offset=" + offset +
", offsetEpoch=" + offsetEpoch +
", currentLeader=" + currentLeader +
'}';
}
}{code}
Class Design: The class contains both offsetEpoch (historical data epoch) and
currentLeader.epoch (current metadata epoch), but allConsumed() only uses the
former when committing.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)