GitHub user apurvam opened a pull request: https://github.com/apache/kafka/pull/4029
KAFKA-6016: Make the reassign partitions system test use the idempotent producer With these changes, we are ensuring that the partitions being reassigned are from non-zero offsets. We also ensure that every message in the log has producerId and sequence number. This means that it successfully reproduces https://issues.apache.org/jira/browse/KAFKA-6003, as can be seen below: ``` [2017-10-05 20:57:00,466] ERROR [ReplicaFetcher replicaId=1, leaderId=4, fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread) kafka.common.KafkaException: Error processing data for partition test_topic-16 offset 682 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:203) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:171) at scala.Option.foreach(Option.scala:257) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:168) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:168) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:168) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:168) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:218) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:166) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:109) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: Found no record of producerId=1000 on the broker. It is possible that the last message with the producerId=1000 has been removed due to hitting the retention limit. ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/apurvam/kafka KAFKA-6016-add-idempotent-producer-to-reassign-partitions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4029.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4029 ---- commit af48d74be4f2c4473d8f97664ff0f3e450bfe3ec Author: Apurva Mehta <apu...@confluent.io> Date: 2017-10-05T05:27:23Z Initial commit trying to create the scenario where we are creating a replica from scratch but starting from a non zero sequence when doing so. commit 9566f91b00a5a7c249823107e4792b844809ccca Author: Apurva Mehta <apu...@confluent.io> Date: 2017-10-05T05:52:24Z Use retention bytes to force segment deletion commit 6087b3ed01472d24677623c9b3ef92a3678da96f Author: Apurva Mehta <apu...@confluent.io> Date: 2017-10-05T21:16:47Z Configure the log so that we can reproduce the case where we are building producer state from a non zero sequence ---- ---