[
https://issues.apache.org/jira/browse/KAFKA-6683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ismael Juma resolved KAFKA-6683.
--------------------------------
Resolution: Fixed
Fix Version/s: 1.1.0
> ReplicaFetcher crashes with "Attempted to complete a transaction which was
> not started"
> ----------------------------------------------------------------------------------------
>
> Key: KAFKA-6683
> URL: https://issues.apache.org/jira/browse/KAFKA-6683
> Project: Kafka
> Issue Type: Bug
> Components: replication
> Affects Versions: 1.0.0
> Environment: os: GNU/Linux
> arch: x86_64
> Kernel: 4.9.77
> jvm: OpenJDK 1.8.0
> Reporter: Chema Sanchez
> Assignee: Jason Gustafson
> Priority: Critical
> Fix For: 1.1.0
>
> Attachments: server.properties
>
>
> We have been experiencing this issue lately when restarting or replacing
> brokers of our Kafka clusters during maintenance operations.
> Having restarted or replaced a broker, after some minutes performing normally
> it may suddenly throw the following exception and stop replicating some
> partitions:
> {code:none}
> 2018-03-15 17:23:01,482] ERROR [ReplicaFetcher replicaId=12, leaderId=10,
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> java.lang.IllegalArgumentException: Attempted to complete a transaction which
> was not started
> at
> kafka.log.ProducerStateManager.completeTxn(ProducerStateManager.scala:720)
> at kafka.log.Log.$anonfun$loadProducersFromLog$4(Log.scala:540)
> at
> kafka.log.Log.$anonfun$loadProducersFromLog$4$adapted(Log.scala:540)
> at scala.collection.immutable.List.foreach(List.scala:389)
> at
> scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:35)
> at
> scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:35)
> at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:44)
> at kafka.log.Log.loadProducersFromLog(Log.scala:540)
> at kafka.log.Log.$anonfun$loadProducerState$5(Log.scala:521)
> at kafka.log.Log.$anonfun$loadProducerState$5$adapted(Log.scala:514)
> at scala.collection.Iterator.foreach(Iterator.scala:929)
> at scala.collection.Iterator.foreach$(Iterator.scala:929)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
> at scala.collection.IterableLike.foreach(IterableLike.scala:71)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at kafka.log.Log.loadProducerState(Log.scala:514)
> at kafka.log.Log.$anonfun$truncateTo$2(Log.scala:1487)
> at
> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.truncateTo(Log.scala:1467)
> at kafka.log.LogManager.$anonfun$truncateTo$2(LogManager.scala:454)
> at
> kafka.log.LogManager.$anonfun$truncateTo$2$adapted(LogManager.scala:445)
> at
> scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:120)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788)
> at kafka.log.LogManager.truncateTo(LogManager.scala:445)
> at
> kafka.server.ReplicaFetcherThread.$anonfun$maybeTruncate$1(ReplicaFetcherThread.scala:281)
> at scala.collection.Iterator.foreach(Iterator.scala:929)
> at scala.collection.Iterator.foreach$(Iterator.scala:929)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
> at scala.collection.IterableLike.foreach(IterableLike.scala:71)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> kafka.server.ReplicaFetcherThread.maybeTruncate(ReplicaFetcherThread.scala:265)
> at
> kafka.server.AbstractFetcherThread.$anonfun$maybeTruncate$2(AbstractFetcherThread.scala:135)
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
> at
> kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:132)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> [2018-03-15 17:23:01,497] INFO [ReplicaFetcher replicaId=12, leaderId=10,
> fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
> {code}
> As during system updates all brokers in a cluster are restarted, it happened
> some times the issue to manifest in different brokers holding replicas for
> the same partition at the same time, which caused downtime due not enough ISR
> replica.
> It is necessary to restart the faulted broker in order to recover partition
> replication, but after hitting this issue we often face that after restarting
> the broker it shuts itself down with the following error among lots of
> warnings due corrupted indices:
> {code:none}
> [2018-03-05 16:02:22,450] ERROR There was an error in one of the threads
> during logs loading: org.apache.kafka.common.errors.ProducerFencedException:
> Invalid producer epoch: 20 (zombie): 21 (current) (kafka.log.LogManager)
> [2018-03-05 16:02:22,453] FATAL [KafkaServer id=10] Fatal error during
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
> org.apache.kafka.common.errors.ProducerFencedException: Invalid producer
> epoch: 20 (zombie): 21 (current)
> {code}
> When this happened the only way to keep Kafka up has been to delete all the
> data inside the log directory (/var/lib/kafka in our case).
> The problem manifest randomly but we managed to reproduce the ReplicaFetcher
> crashing (although not the failed startup) out of our production cluster by
> doing this:
> 1 - Setup a Kafka cluster running 3 brokers (see attached configuration):
> 10, 11 and 12
> 2 - Create a topic with the following settings: Topic:mytopic2,
> PartitionCount:12, ReplicationFactor:3,
> Configs:segment.bytes=52428800,retention.ms=1800000
> 3 - Run some producers like this:
> {code:java}
> while true
> do
> ./kafka-producer-perf-test.sh --topic mytopic2 --record-size=2048
> --producer-props
> bootstrap.servers=ec2-XXX-XXX-XXX-XXX.eu-west-1.compute.amazonaws.com:9092
> enable.idempotence=true --throughput 50 --num-records 6000 --transactional-id
> pruebatrans4 --transaction-duration-ms 100
> done
> {code}
> 4 - Run some consumer on mytopic2.
> 5 - Wait for some time for semegments to be rotated.
> 6 - Stop broker 11, remove everything inside /var/lib/kafka, start it again.
> 7 - Wait for data to be replicated and all replicas be in ISR.
> 8 - Stop broker 12, remove everything inside /var/lib/kafka, start it again.
> 9 - Wait for data to be replicated and all replicas be in ISR.
> 10 - Wait for the issue to manifest. If it manifests, after some minutes of
> normal behaviour, broker 11 may suddenly stop replicating and some partitions
> may appear underreplicated.
> If replication after restarting node 12 takes long enough, node 11 may crash
> its ReplicaFetcher before replicas in 12 are available causing partitions to
> go offline. Whe have manage to reproduce the issue without deleting log data
> in steps 6 and 8 but it seems more likely to manifest if we do it. The broker
> experiencing the issue is quite random, but most of the time seems to be one
> of the already restarted brokers but not necessary the latest one.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)