lakshmi-manasa-g opened a new pull request #1351:
URL: https://github.com/apache/samza/pull/1351


   **Symptom:** containers dying with ConcurrentModificationException thrown 
from KafkaConsumer
   like `org.apache.samza.SamzaException: 
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access...
   Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not 
safe for multi-threaded access
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2204)
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2188)
 
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1475)...`
   
   **Cause:**  The root cause is accessing KafkaConsumer inside 
KafkaConsumerProxy.initializeLags without a synchronized block. 
   How it was root caused: From KafkaConsumer.acquire code: KafkaConsumer 
tracks the current thread accessing the object. If another thread tries to 
access before the current thread has released then this 
ConcurrentModificationException is thrown. Checking logs of the failed 
container, KafkaConsumerProxy thread (say P) had started prior to this 
exception. Another thread  (say C) on which `KafkaConsumer.commitAsync` was 
invoked, actually accesses the shared consumer inside a synchronized block 
(like `synchronized(consumer)`). Thus, it was clear that though exception was 
thrown in C's code, it was a thread other than C (P in this case) which was 
holding access but had not entered a sync block prior to that. And of all the 
methods in P which access the consumer, this is the only method which does not 
have sync block.
   
   **Changes:** Add a synchronized block around kafkaConsumer access in 
initializeLags method.
   
   **Tests:** Added unit test 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to