Chema Sanchez created KAFKA-6683:
------------------------------------

             Summary: ReplicaFetcher crashes with "Attempted to complete a 
transaction which was not started" 
                 Key: KAFKA-6683
                 URL: https://issues.apache.org/jira/browse/KAFKA-6683
             Project: Kafka
          Issue Type: Bug
          Components: replication
    Affects Versions: 1.0.0
         Environment: os: GNU/Linux 
arch: x86_64
Kernel: 4.9.77
jvm: OpenJDK 1.8.0
            Reporter: Chema Sanchez
         Attachments: server.properties

We have been experiencing this issue lately when restarting or replacing 
brokers of our Kafka clusters during maintenance operations.

After restarting or replacing a broker, after some minutes performing normally 
it may suddenly throw the following exception and stop replicating some 
partitions:

{code:none}
2018-03-15 17:23:01,482] ERROR [ReplicaFetcher replicaId=12, leaderId=10, 
fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
java.lang.IllegalArgumentException: Attempted to complete a transaction which 
was not started
        at 
kafka.log.ProducerStateManager.completeTxn(ProducerStateManager.scala:720)
        at kafka.log.Log.$anonfun$loadProducersFromLog$4(Log.scala:540)
        at kafka.log.Log.$anonfun$loadProducersFromLog$4$adapted(Log.scala:540)
        at scala.collection.immutable.List.foreach(List.scala:389)
        at 
scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:35)
        at 
scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:35)
        at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:44)
        at kafka.log.Log.loadProducersFromLog(Log.scala:540)
        at kafka.log.Log.$anonfun$loadProducerState$5(Log.scala:521)
        at kafka.log.Log.$anonfun$loadProducerState$5$adapted(Log.scala:514)
        at scala.collection.Iterator.foreach(Iterator.scala:929)
        at scala.collection.Iterator.foreach$(Iterator.scala:929)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
        at scala.collection.IterableLike.foreach(IterableLike.scala:71)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at kafka.log.Log.loadProducerState(Log.scala:514)
        at kafka.log.Log.$anonfun$truncateTo$2(Log.scala:1487)
        at 
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12)
        at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
        at kafka.log.Log.truncateTo(Log.scala:1467)
        at kafka.log.LogManager.$anonfun$truncateTo$2(LogManager.scala:454)
        at 
kafka.log.LogManager.$anonfun$truncateTo$2$adapted(LogManager.scala:445)
        at 
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:120)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788)
        at kafka.log.LogManager.truncateTo(LogManager.scala:445)
        at 
kafka.server.ReplicaFetcherThread.$anonfun$maybeTruncate$1(ReplicaFetcherThread.scala:281)
        at scala.collection.Iterator.foreach(Iterator.scala:929)
        at scala.collection.Iterator.foreach$(Iterator.scala:929)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
        at scala.collection.IterableLike.foreach(IterableLike.scala:71)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
kafka.server.ReplicaFetcherThread.maybeTruncate(ReplicaFetcherThread.scala:265)
        at 
kafka.server.AbstractFetcherThread.$anonfun$maybeTruncate$2(AbstractFetcherThread.scala:135)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
        at 
kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:132)
        at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
[2018-03-15 17:23:01,497] INFO [ReplicaFetcher replicaId=12, leaderId=10, 
fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
{code}

As during system updates, all brokers in a cluster are restarted, it happened 
some times the issue to manifest in different brokers holding replicas for the 
same partition at the same time, which caused downtime due partitions go 
offline.

It is necessary to restart the faulted broker in order to recover partition 
replication, but after hitting this issue we often face that after restarting 
the broker it shuts itself down whith the following error among lots of 
warnings due corrupted indices:

{code:none}
[2018-03-05 16:02:22,450] ERROR There was an error in one of the threads during 
logs loading: org.apache.kafka.common.errors.ProducerFencedException: Invalid 
producer epoch: 20 (zombie): 21 (current) (kafka.log.LogManager)
[2018-03-05 16:02:22,453] FATAL [KafkaServer id=10] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.errors.ProducerFencedException: Invalid producer epoch: 
20 (zombie): 21 (current)
{code}

When this happened the only way to keep Kafka up has been to delete all the 
data inside the log directory (/var/lib/kafka in our case).


The problem manifest randomly but we managed to reproduce the ReplicaFetcher 
crashing (although not the failed startup) out of our production cluster by 
doing this:
 1 - Setup a Kafka cluster running 3 brokers (see attached configuration): 10, 
11 and 12
 2 - Create a topic with the following settings:
Topic:mytopic2    PartitionCount:12    ReplicationFactor:3    
Configs:segment.bytes=52428800,retention.ms=1800000
 3 - Run some producers like this:
{code:bash}
while true
do
 ./kafka-producer-perf-test.sh --topic mytopic2 --record-size=2048 
--producer-props 
bootstrap.servers=ec2-XXX-XXX-XXX-XXX.eu-west-1.compute.amazonaws.com:9092 
enable.idempotence=true --throughput 50 --num-records 6000 --transactional-id 
pruebatrans4 --transaction-duration-ms 100
done
{code}
 4 - Run some consumer on mytopic2.
 5 - Wait for some time for semegments to be rotated.
 6 - Stop broker 11, remove everything inside /var/lib/kafka, start it again.
 7 - Wait for data to be replicated and all replicas be in ISR.
 8 - Stop broker 12, remove everything inside /var/lib/kafka, start it again.
 9 - Wait for data to be replicated and all replicas be in ISR.
 10 - Wait for the issue to manifest. If it manifests, after some minutes of 
normal behaviour, broker 11 may suddenly stop replicating and some partitions 
may appear underreplicated.


It replication after restarting node 12 takes long enough, node 11 may crash 
its ReplicaFetcher before replicas in 12 are available causing partitions to go 
offline.
The broker experiencing the issue is quite random, but most of the time seems 
to be one of the already restarted brokers but not necessary the latest one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to