Jason Gustafson commented on KAFKA-4682:
Sorry for the late response. There seem to be a few open questions:
1. Does the consumer need the ability to control the retention timeout or is a
broker config sufficient? I am not too sure about this. There is at least one
use case (ConsoleConsumer) where we might intentionally set a low value, but
I'm not sure how bad it would be to let it stick with the default. It certainly
would have been helpful prior to Ewen's KIP.
2. Do we still need offset-level expiration or should we move it to the group?
Personally, it feels a little odd to expire offsets at different times once a
group is empty. It's a little more intuitive to expire them all at once.
Another way to view this would be that we deprecate the offset retention
setting and add a group metadata retention setting. Once the group has gone
empty, we start its retention timer. If it expires, we clear all of its state
3. Do we need to change the format of the offset metadata messages? Currently
the offset metadata that is stored in the log includes an expiration timestamp.
This won't make much sense any more because we won't know what timestamp to use
when the offset is first stored. While we're at it, we could probably also
remove the commit timestamp and use the timestamp from the message itself. This
also depends on the answer to the first question.
4. Should we start the expiration timer for an individual offset if the group
is no longer subscribed to the corresponding topic? My inclination is to keep
it simple and say no, but I guess there is a risk that this tends to grow the
cache more than existing behavior. If we're concerned about this, then we
probably need to keep the individual offset expiration timer. Unfortunately
because of the generic group protocol (which is also used in Connect), we don't
currently have the ability to inspect subscriptions to know if a topic is still
> Committed offsets should not be deleted if a consumer is still active
> Key: KAFKA-4682
> URL: https://issues.apache.org/jira/browse/KAFKA-4682
> Project: Kafka
> Issue Type: Bug
> Reporter: James Cheng
> Kafka will delete committed offsets that are older than
> If there is an active consumer on a low traffic partition, it is possible
> that Kafka will delete the committed offset for that consumer. Once the
> offset is deleted, a restart or a rebalance of that consumer will cause the
> consumer to not find any committed offset and start consuming from
> earliest/latest (depending on auto.offset.reset). I'm not sure, but a broker
> failover might also cause you to start reading from auto.offset.reset (due to
> broker restart, or coordinator failover).
> I think that Kafka should only delete offsets for inactive consumers. The
> timer should only start after a consumer group goes inactive. For example, if
> a consumer group goes inactive, then after 1 week, delete the offsets for
> that consumer group. This is a solution that [~junrao] mentioned in
> The current workarounds are to:
> # Commit an offset on every partition you own on a regular basis, making sure
> that it is more frequent than offsets.retention.minutes (a broker-side
> setting that a consumer might not be aware of)
> # Turn the value of offsets.retention.minutes up really really high. You have
> to make sure it is higher than any valid low-traffic rate that you want to
> support. For example, if you want to support a topic where someone produces
> once a month, you would have to set offsetes.retention.mintues to 1 month.
> # Turn on enable.auto.commit (this is essentially #1, but easier to
> None of these are ideal.
> #1 can be spammy. It requires your consumers know something about how the
> brokers are configured. Sometimes it is out of your control. Mirrormaker, for
> example, only commits offsets on partitions where it receives data. And it is
> duplication that you need to put into all of your consumers.
> #2 has disk-space impact on the broker (in __consumer_offsets) as well as
> memory-size on the broker (to answer OffsetFetch).
> #3 I think has the potential for message loss (the consumer might commit on
> messages that are not yet fully processed)
This message was sent by Atlassian JIRA