[ https://issues.apache.org/jira/browse/KAFKA-4089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sumant Tambe updated KAFKA-4089: -------------------------------- Description: The basic idea of batch expiration is that we don't expire batches when producer thinks "it can make progress". Currently the notion of "making progress" involves only in-flight requests (muted partitions). That's not sufficient. The other half of the "making progress" is that if we have stale metadata, we cannot trust it and therefore can't say we can't make progress. Therefore, we don't expire batched when metadata is stale. This also implies we don't want to expire batches when we can still make progress even if the batch remains in the queue longer than the batch expiration time. The current condition in {{abortExpiredBatches}} that bypasses muted partitions is necessary but not sufficient. It should additionally restrict ejection when metadata is stale. Conversely, it should expire batches only when the following is true # meta-data is fresh AND # batch remained in the queue longer than request timeout. was: The basic idea of batch expiration is that we don't expire batches when producer thinks "it can make progress". Currently the notion of "making progress" involves only in-flight requests (muted partitions). That's not sufficient. The other half of the "making progress" is that if we have stale metadata, we cannot trust it and therefore can't say we can't make progress. Therefore, we don't expire batched when metadata is stale. This also implies we don't want to expire batches when we can still make progress even if the batch remains in the queue longer than the batch expiration time. More concretely, the batch expiration logic ({{RecordAccumualator.abortExpiredBatches}}) ejects batches out when the cluster metadata needs an update ({{Metadata.timeToNextUpdate==0}}). In this case, no nodes are "ready" to send data to ({{result.readyNodes}} is empty). As a consequence, {{Sender.drain}} does not drain any batch at all and therefore no new topic-partitions are muted. The batch expiration logic ({{RecordAccumualator.abortExpiredBatches}}) bypasses muted partitions only. As there are no new muted partitions, everything that was not sent in previous drains is subject to expiration. As a result, a group of batches expire if they linger in the queue for longer than {{requestTimeout}}. The current condition in {{abortExpiredBatches}} that bypasses muted partitions is necessary but not sufficient. It should additionally restrict ejection when metadata is stale. Conversely, it should expire batches only when the following is true # !muted AND # meta-data is fresh but leader not available # batch remained in the queue longer than request timeout. > KafkaProducer raises Batch Expired exception > --------------------------------------------- > > Key: KAFKA-4089 > URL: https://issues.apache.org/jira/browse/KAFKA-4089 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 0.10.0.1 > Reporter: Sumant Tambe > Assignee: Dong Lin > > The basic idea of batch expiration is that we don't expire batches when > producer thinks "it can make progress". Currently the notion of "making > progress" involves only in-flight requests (muted partitions). That's not > sufficient. The other half of the "making progress" is that if we have stale > metadata, we cannot trust it and therefore can't say we can't make progress. > Therefore, we don't expire batched when metadata is stale. This also implies > we don't want to expire batches when we can still make progress even if the > batch remains in the queue longer than the batch expiration time. > The current condition in {{abortExpiredBatches}} that bypasses muted > partitions is necessary but not sufficient. It should additionally restrict > ejection when metadata is stale. > Conversely, it should expire batches only when the following is true > # meta-data is fresh AND > # batch remained in the queue longer than request timeout. -- This message was sent by Atlassian JIRA (v6.3.4#6332)