Jake Maes created SAMZA-1537:
--------------------------------

             Summary: StreamAppender can deadlock due to locks held by Kafka 
and Log4j
                 Key: SAMZA-1537
                 URL: https://issues.apache.org/jira/browse/SAMZA-1537
             Project: Samza
          Issue Type: Bug
            Reporter: Jake Maes
            Assignee: Jake Maes


The thread dumps of the 2 offending threads are below, but the basics are: 
1. AppInfoParser in kafka uses static synchronized methods
2. Log4j synchronizes per Category

So if the StreamAppender tries create a new KafkaProducer, which calls the 
static sync AppInfoParser thread, which then tries to log to the same Category

{noFormat}
"kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon 
prio=5 tid=23 BLOCKED
        at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:57)
           Local Variable: java.lang.String#326563
           Local Variable: java.lang.String#329864
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
           Local Variable: 
org.apache.kafka.common.config.AbstractConfig$RecordingMap#12
           Local Variable: java.util.ArrayList#265184
           Local Variable: org.apache.kafka.common.metrics.MetricConfig#9
           Local Variable: java.util.LinkedHashMap#991
           Local Variable: 
org.apache.kafka.common.internals.ClusterResourceListeners#9
           Local Variable: java.util.ArrayList#265353
           Local Variable: org.apache.kafka.clients.NetworkClient#9
           Local Variable: org.apache.kafka.common.network.SslChannelBuilder#9
           Local Variable: java.util.ArrayList#265374
           Local Variable: org.apache.kafka.clients.producer.ProducerConfig#3
           Local Variable: java.lang.String#309971
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
           Local Variable: 
org.apache.kafka.common.config.AbstractConfig$RecordingMap#11
           Local Variable: org.apache.kafka.clients.producer.KafkaProducer#3
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#7
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#8
        at 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#2
        at 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#10
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#9
           Local Variable: java.util.Properties#38
           Local Variable: 
com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#3
           Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#9
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#3
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.send(SamzaRawLiKafkaSystemProducer.java:167)
           Local Variable: java.lang.String#326561
           Local Variable: java.lang.IllegalStateException#2
           Local Variable: java.lang.String#330077
           Local Variable: org.apache.samza.system.SystemProducerException#4
           Local Variable: java.lang.Integer#15116
        at 
org.apache.samza.logging.log4j.StreamAppender.append(StreamAppender.java:115)
        at 
com.linkedin.atc.log4j.SafeStreamAppender.streamAppend(SafeStreamAppender.java:32)
        at 
com.linkedin.atc.log4j.SafeStreamAppender.append(SafeStreamAppender.java:23)
           Local Variable: com.linkedin.atc.log4j.SafeStreamAppender#1
        at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
        at 
org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
           Local Variable: org.apache.log4j.helpers.AppenderAttachableImpl#1
        at org.apache.log4j.Category.callAppenders(Category.java:206)
           Local Variable: org.apache.log4j.spi.LoggingEvent#24
           Local Variable: org.apache.log4j.Logger#4
        at org.apache.log4j.Category.forcedLog(Category.java:391)
        at org.apache.log4j.Category.log(Category.java:856)
        at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:323)
        at 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.close(LiKafkaProducerImpl.java:313)
           Local Variable: java.util.concurrent.TimeUnit$3#1
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:220)
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#4
           Local Variable: 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#2
           Local Variable: java.lang.Boolean#1
           Local Variable: org.apache.samza.system.SystemProducerException#2
           Local Variable: java.lang.Object#203455
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown
 string>)
        at 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#58
        at 
org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#58
           Local Variable: org.apache.kafka.common.errors.TimeoutException#2
           Local Variable: java.util.ArrayList$Itr#6
        at 
org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
           Local Variable: java.util.ArrayList#263984
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordAccumulator#4
           Local Variable: java.util.ArrayList$Itr#5
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordBatch#34
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
           Local Variable: org.apache.kafka.common.Cluster#2
           Local Variable: java.util.Collections$EmptyMap#1
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#2
           Local Variable: java.util.HashMap$KeyIterator#2
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
           Local Variable: org.apache.kafka.clients.producer.internals.Sender#4
        at java.lang.Thread.run(Thread.java:745)


"kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon 
prio=5 tid=35 BLOCKED
        at org.apache.log4j.Category.callAppenders(Category.java:204)
           Local Variable: org.apache.log4j.spi.LoggingEvent#26
           Local Variable: org.apache.log4j.Logger#15
        at org.apache.log4j.Category.forcedLog(Category.java:391)
        at org.apache.log4j.Category.log(Category.java:856)
        at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:304)
        at 
org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:87)
        at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:59)
           Local Variable: javax.management.ObjectName#162
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
           Local Variable: org.apache.kafka.common.network.SslChannelBuilder#7
           Local Variable: java.util.ArrayList#264895
           Local Variable: 
org.apache.kafka.common.internals.ClusterResourceListeners#7
           Local Variable: java.lang.String#308990
           Local Variable: 
org.apache.kafka.common.config.AbstractConfig$RecordingMap#8
           Local Variable: java.util.LinkedHashMap#854
           Local Variable: java.util.ArrayList#264889
           Local Variable: org.apache.kafka.common.metrics.MetricConfig#7
           Local Variable: java.util.ArrayList#264910
           Local Variable: org.apache.kafka.clients.producer.ProducerConfig#2
           Local Variable: org.apache.kafka.clients.NetworkClient#7
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#5
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#6
           Local Variable: 
org.apache.kafka.common.config.AbstractConfig$RecordingMap#6
           Local Variable: org.apache.kafka.clients.producer.KafkaProducer#2
        at 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#1
        at 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
           Local Variable: java.util.Properties#67
           Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#8
           Local Variable: 
com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#2
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#2
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#14
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#13
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
           Local Variable: org.apache.samza.system.SystemProducerException#1
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#5
           Local Variable: 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#3
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown
 string>)
        at 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#27
        at 
org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#27
           Local Variable: java.util.ArrayList$Itr#2
           Local Variable: org.apache.kafka.common.errors.TimeoutException#1
        at 
org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordAccumulator#10
           Local Variable: java.util.ArrayList$Itr#1
           Local Variable: java.util.ArrayList#263305
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordBatch#21
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
           Local Variable: org.apache.kafka.common.Cluster#1
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#1
           Local Variable: java.util.HashMap$KeyIterator#1
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
           Local Variable: org.apache.kafka.clients.producer.internals.Sender#7
        at java.lang.Thread.run(Thread.java:745)
{noFormat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to