[ 
https://issues.apache.org/jira/browse/SAMZA-1537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jake Maes resolved SAMZA-1537.
------------------------------
       Resolution: Fixed
    Fix Version/s: 0.15.0

Issue resolved by pull request 388
[https://github.com/apache/samza/pull/388]

> StreamAppender can deadlock due to locks held by Kafka and Log4j
> ----------------------------------------------------------------
>
>                 Key: SAMZA-1537
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1537
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Jake Maes
>            Assignee: Jake Maes
>             Fix For: 0.15.0
>
>
> The thread dumps of the 2 offending threads are below, but the basics are: 
> 1. AppInfoParser in kafka uses static synchronized methods
> 2. Log4j synchronizes per Category
> So if the StreamAppender tries create a new KafkaProducer, which calls the 
> static sync AppInfoParser thread, which then tries to log to the same Category
> {noFormat}
> "kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon 
> prio=5 tid=23 BLOCKED
>       at 
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:57)
>          Local Variable: java.lang.String#326563
>          Local Variable: java.lang.String#329864
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
>          Local Variable: 
> org.apache.kafka.common.config.AbstractConfig$RecordingMap#12
>          Local Variable: java.util.ArrayList#265184
>          Local Variable: org.apache.kafka.common.metrics.MetricConfig#9
>          Local Variable: java.util.LinkedHashMap#991
>          Local Variable: 
> org.apache.kafka.common.internals.ClusterResourceListeners#9
>          Local Variable: java.util.ArrayList#265353
>          Local Variable: org.apache.kafka.clients.NetworkClient#9
>          Local Variable: org.apache.kafka.common.network.SslChannelBuilder#9
>          Local Variable: java.util.ArrayList#265374
>          Local Variable: org.apache.kafka.clients.producer.ProducerConfig#3
>          Local Variable: java.lang.String#309971
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
>          Local Variable: 
> org.apache.kafka.common.config.AbstractConfig$RecordingMap#11
>          Local Variable: org.apache.kafka.clients.producer.KafkaProducer#3
>          Local Variable: 
> org.apache.kafka.common.serialization.ByteArraySerializer#7
>          Local Variable: 
> org.apache.kafka.common.serialization.ByteArraySerializer#8
>       at 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
>          Local Variable: 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#2
>       at 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
>          Local Variable: 
> org.apache.kafka.common.serialization.ByteArraySerializer#10
>          Local Variable: 
> org.apache.kafka.common.serialization.ByteArraySerializer#9
>          Local Variable: java.util.Properties#38
>          Local Variable: 
> com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#3
>          Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#9
>          Local Variable: 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#3
>       at 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
>       at 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
>       at 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.send(SamzaRawLiKafkaSystemProducer.java:167)
>          Local Variable: java.lang.String#326561
>          Local Variable: java.lang.IllegalStateException#2
>          Local Variable: java.lang.String#330077
>          Local Variable: org.apache.samza.system.SystemProducerException#4
>          Local Variable: java.lang.Integer#15116
>       at 
> org.apache.samza.logging.log4j.StreamAppender.append(StreamAppender.java:115)
>       at 
> com.linkedin.atc.log4j.SafeStreamAppender.streamAppend(SafeStreamAppender.java:32)
>       at 
> com.linkedin.atc.log4j.SafeStreamAppender.append(SafeStreamAppender.java:23)
>          Local Variable: com.linkedin.atc.log4j.SafeStreamAppender#1
>       at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
>       at 
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
>          Local Variable: org.apache.log4j.helpers.AppenderAttachableImpl#1
>       at org.apache.log4j.Category.callAppenders(Category.java:206)
>          Local Variable: org.apache.log4j.spi.LoggingEvent#24
>          Local Variable: org.apache.log4j.Logger#4
>       at org.apache.log4j.Category.forcedLog(Category.java:391)
>       at org.apache.log4j.Category.log(Category.java:856)
>       at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:323)
>       at 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.close(LiKafkaProducerImpl.java:313)
>          Local Variable: java.util.concurrent.TimeUnit$3#1
>       at 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:220)
>          Local Variable: 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#4
>          Local Variable: 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#2
>          Local Variable: java.lang.Boolean#1
>          Local Variable: org.apache.samza.system.SystemProducerException#2
>          Local Variable: java.lang.Object#203455
>       at 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
>       at 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown
>  string>)
>       at 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
>          Local Variable: 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#58
>       at 
> org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
>          Local Variable: 
> org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#58
>          Local Variable: org.apache.kafka.common.errors.TimeoutException#2
>          Local Variable: java.util.ArrayList$Itr#6
>       at 
> org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
>       at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
>          Local Variable: java.util.ArrayList#263984
>          Local Variable: 
> org.apache.kafka.clients.producer.internals.RecordAccumulator#4
>          Local Variable: java.util.ArrayList$Itr#5
>          Local Variable: 
> org.apache.kafka.clients.producer.internals.RecordBatch#34
>       at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
>          Local Variable: org.apache.kafka.common.Cluster#2
>          Local Variable: java.util.Collections$EmptyMap#1
>          Local Variable: 
> org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#2
>          Local Variable: java.util.HashMap$KeyIterator#2
>       at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
>          Local Variable: org.apache.kafka.clients.producer.internals.Sender#4
>       at java.lang.Thread.run(Thread.java:745)
> "kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon 
> prio=5 tid=35 BLOCKED
>       at org.apache.log4j.Category.callAppenders(Category.java:204)
>          Local Variable: org.apache.log4j.spi.LoggingEvent#26
>          Local Variable: org.apache.log4j.Logger#15
>       at org.apache.log4j.Category.forcedLog(Category.java:391)
>       at org.apache.log4j.Category.log(Category.java:856)
>       at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:304)
>       at 
> org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:87)
>       at 
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:59)
>          Local Variable: javax.management.ObjectName#162
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
>          Local Variable: org.apache.kafka.common.network.SslChannelBuilder#7
>          Local Variable: java.util.ArrayList#264895
>          Local Variable: 
> org.apache.kafka.common.internals.ClusterResourceListeners#7
>          Local Variable: java.lang.String#308990
>          Local Variable: 
> org.apache.kafka.common.config.AbstractConfig$RecordingMap#8
>          Local Variable: java.util.LinkedHashMap#854
>          Local Variable: java.util.ArrayList#264889
>          Local Variable: org.apache.kafka.common.metrics.MetricConfig#7
>          Local Variable: java.util.ArrayList#264910
>          Local Variable: org.apache.kafka.clients.producer.ProducerConfig#2
>          Local Variable: org.apache.kafka.clients.NetworkClient#7
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
>          Local Variable: 
> org.apache.kafka.common.serialization.ByteArraySerializer#5
>          Local Variable: 
> org.apache.kafka.common.serialization.ByteArraySerializer#6
>          Local Variable: 
> org.apache.kafka.common.config.AbstractConfig$RecordingMap#6
>          Local Variable: org.apache.kafka.clients.producer.KafkaProducer#2
>       at 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
>          Local Variable: 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#1
>       at 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
>          Local Variable: java.util.Properties#67
>          Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#8
>          Local Variable: 
> com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#2
>          Local Variable: 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#2
>          Local Variable: 
> org.apache.kafka.common.serialization.ByteArraySerializer#14
>          Local Variable: 
> org.apache.kafka.common.serialization.ByteArraySerializer#13
>       at 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
>       at 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
>          Local Variable: org.apache.samza.system.SystemProducerException#1
>          Local Variable: 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#5
>          Local Variable: 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#3
>       at 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
>       at 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown
>  string>)
>       at 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
>          Local Variable: 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#27
>       at 
> org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
>          Local Variable: 
> org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#27
>          Local Variable: java.util.ArrayList$Itr#2
>          Local Variable: org.apache.kafka.common.errors.TimeoutException#1
>       at 
> org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
>       at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
>          Local Variable: 
> org.apache.kafka.clients.producer.internals.RecordAccumulator#10
>          Local Variable: java.util.ArrayList$Itr#1
>          Local Variable: java.util.ArrayList#263305
>          Local Variable: 
> org.apache.kafka.clients.producer.internals.RecordBatch#21
>       at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
>          Local Variable: org.apache.kafka.common.Cluster#1
>          Local Variable: 
> org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#1
>          Local Variable: java.util.HashMap$KeyIterator#1
>       at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
>          Local Variable: org.apache.kafka.clients.producer.internals.Sender#7
>       at java.lang.Thread.run(Thread.java:745)
> {noFormat}
> After some discussion with [~pmaheshwari], we felt making the StreamAppender 
> async was the only reliable solution. There are 2 approaches to this:
> 1. Use log4j2 which has async logging by default. The down side is that we'd 
> have to update the StreamAppender and JmxAppender to be log4j2 plugins 
> instead of simply extending AppenderSkeleton.
> 2. Add a queue and thread to StreamAppender s.t. new events are added to the 
> queue with some timeout and the thread consumes from the queue and sends to 
> the configured SystemProducer. 
> 2 is favorable right now because it's quicker to implement and test. It can 
> also be easily replaced by option 1 which is already a goal because we want 
> to leverage the performance benefits of log4j2. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to