GitHub user apurvam opened a pull request:
https://github.com/apache/kafka/pull/4022
KAFKA-6015: Fix NullPointerExceptionInRecordAccumulator
It is possible for batches with sequence numbers to be in the `deque` while
at the same time the in flight batches in the `TransactionManager` are removed
due to a producerId reset.
In this case, when the batches in the `deque` are drained, we will get a
`NullPointerException` in the background thread due to this line:
```java
if (first.hasSequence() && first.baseSequence() !=
transactionManager.nextBatchBySequence(first.topicPartition).baseSequence())
```
Particularly, `transactionManager.nextBatchBySequence` will return null,
because there no inflight batches being tracked.
In this patch, we simply allow the batches in the `deque` to be drained if
there are no in flight batches being tracked in the TransactionManager. If they
succeed, well and good. If the responses come back with an error, the batces
will be ultimately failed in the producer with an `OutOfOrderSequenceException`
when the response comes back.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/apurvam/kafka
KAFKA-6015-npe-in-record-accumulator
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/4022.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4022
----
commit 8d4ced9f42afb16113ed9464697b4d52394e1304
Author: Apurva Mehta <[email protected]>
Date: 2017-10-05T05:56:44Z
Fix NPE in Accumulator
commit 18053f749d8b80bb878f7781f95cb02b58933f9f
Author: Apurva Mehta <[email protected]>
Date: 2017-10-05T06:34:47Z
Add test case to ensure that we can send queued batches from a previous
producer id after the producer state is reset
----
---