[ https://issues.apache.org/jira/browse/KAFKA-5494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16142077#comment-16142077 ]
ASF GitHub Bot commented on KAFKA-5494: --------------------------------------- GitHub user apurvam opened a pull request: https://github.com/apache/kafka/pull/3743 KAFKA-5494: enable idempotence with max.in.flight.requests.per.connection > 1 Here we introduce client and broker changes to support multiple inflight requests while still guaranteeing idempotence. Two major problems to be solved: 1. Sequence number management on the client when there are request failures. When a batch fails, future inflight batches will also fail with `OutOfOrderSequenceException`. This must be handled on the client with intelligent sequence reassignment. We must also deal with the fatal failure of some batch: the future batches must get different sequence numbers when the come back. 2. On the broker, when we have multiple inflights, we can get duplicates of multiple old batches. With this patch, we retain the record metadata for 5 older batches. I have added `TODO(reviewers)` comments for specific decisions in the code which are worth discussing. TODO: 1. Add more unit tests, especially for loading different snapshot versions correctly, more client side unit tests, more broker side tests to validate that we are caching the correct number of batches (some of this is already there). 2. Update the system tests to check for ordering. 3. Run a tight loop of system tests. 4. Add comments about the assumptions made around the network client semantics of send/receive. You can merge this pull request into a Git repository by running: $ git pull https://github.com/apurvam/kafka KAFKA-5494-increase-max-in-flight-for-idempotent-producer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3743.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3743 ---- commit 005eee527ab425d8e3d8678aad4b5305cde6ca08 Author: Apurva Mehta <apu...@confluent.io> Date: 2017-08-12T00:25:06Z Initial commit of client side changes with some tests commit 63bf074a38ec3efef728863081805a36d9111038 Author: Apurva Mehta <apu...@confluent.io> Date: 2017-08-17T23:49:10Z Implemented broker side changes to cache extra metadata. Todo: 1) Write more unit tests. 2) Handle deletion / retention / cleaning correctly. commit 1ad49f30f03ff665f5657680cbcc5e045210ce45 Author: Apurva Mehta <apu...@confluent.io> Date: 2017-08-23T00:10:39Z Change the client side code so that the sequence numbers are assigned and incremented during drain. If a batch is retried, it's sequence number is unset during the completion handler. If the first inflight batch returns an error, the next sequence to assign is reset to the last ack'd sequence + 1. commit d9b86b7cb8e7001a7d5fc42a2ec061ebd0332a6a Author: Apurva Mehta <apu...@confluent.io> Date: 2017-08-24T01:33:54Z WIP commit 9ff885fe6db7172d28ea8fe406972a7763c0a49d Author: Apurva Mehta <apu...@confluent.io> Date: 2017-08-25T06:23:50Z Implemented log cleaning functionality with tests commit 5508a194c74a8946a8451c01814324e6ba788cfe Author: Apurva Mehta <apu...@confluent.io> Date: 2017-08-25T19:27:03Z Fix merge issues aftre rebasing onto trunk ---- > Idempotent producer should not require > max.in.flight.requests.per.connection=1 and acks=all > ------------------------------------------------------------------------------------------- > > Key: KAFKA-5494 > URL: https://issues.apache.org/jira/browse/KAFKA-5494 > Project: Kafka > Issue Type: Sub-task > Affects Versions: 0.11.0.0 > Reporter: Apurva Mehta > Assignee: Apurva Mehta > Labels: exactly-once > Fix For: 1.0.0 > > > Currently, the idempotent producer (and hence transactional producer) > requires max.in.flight.requests.per.connection=1. > This was due to simplifying the implementation on the client and server. With > some additional work, we can satisfy the idempotent guarantees even with any > number of in flight requests. The changes on the client be summarized as > follows: > > # We increment sequence numbers when batches are drained. > # If for some reason, a batch fails with a retriable error, we know that all > future batches would fail with an out of order sequence exception. > # As such, the client should treat some OutOfOrderSequence errors as > retriable. In particular, we should maintain the 'last acked sequnece'. If > the batch succeeding the last ack'd sequence has an OutOfOrderSequence, that > is a fatal error. If a future batch fails with OutOfOrderSequence they should > be reenqeued. > # With the changes above, the the producer queues should become priority > queues ordered by the sequence numbers. > # The partition is not ready unless the front of the queue has the next > expected sequence. > With the changes above, we would get the benefits of multiple inflights in > normal cases. When there are failures, we automatically constrain to a single > inflight until we get back in sequence. > With multiple inflights, we now have the possibility of getting duplicates > for batches other than the last appended batch. In order to return the record > metadata (including offset) of the duplicates inside the log, we would > require a log scan at the tail to get the metadata at the tail. This can be > optimized by caching the metadata for the last 'n' batches. For instance, if > the default max.inflight is 5, we could cache the record metadata of the last > 5 batches, and fall back to a scan if the duplicate is not within those 5. -- This message was sent by Atlassian JIRA (v6.4.14#64029)