Jake Maes created SAMZA-1537:
--------------------------------
Summary: StreamAppender can deadlock due to locks held by Kafka
and Log4j
Key: SAMZA-1537
URL: https://issues.apache.org/jira/browse/SAMZA-1537
Project: Samza
Issue Type: Bug
Reporter: Jake Maes
Assignee: Jake Maes
The thread dumps of the 2 offending threads are below, but the basics are:
1. AppInfoParser in kafka uses static synchronized methods
2. Log4j synchronizes per Category
So if the StreamAppender tries create a new KafkaProducer, which calls the
static sync AppInfoParser thread, which then tries to log to the same Category
{noFormat}
"kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon
prio=5 tid=23 BLOCKED
at
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:57)
Local Variable: java.lang.String#326563
Local Variable: java.lang.String#329864
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
Local Variable:
org.apache.kafka.common.config.AbstractConfig$RecordingMap#12
Local Variable: java.util.ArrayList#265184
Local Variable: org.apache.kafka.common.metrics.MetricConfig#9
Local Variable: java.util.LinkedHashMap#991
Local Variable:
org.apache.kafka.common.internals.ClusterResourceListeners#9
Local Variable: java.util.ArrayList#265353
Local Variable: org.apache.kafka.clients.NetworkClient#9
Local Variable: org.apache.kafka.common.network.SslChannelBuilder#9
Local Variable: java.util.ArrayList#265374
Local Variable: org.apache.kafka.clients.producer.ProducerConfig#3
Local Variable: java.lang.String#309971
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
Local Variable:
org.apache.kafka.common.config.AbstractConfig$RecordingMap#11
Local Variable: org.apache.kafka.clients.producer.KafkaProducer#3
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#7
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#8
at
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#2
at
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#10
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#9
Local Variable: java.util.Properties#38
Local Variable:
com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#3
Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#9
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#3
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.send(SamzaRawLiKafkaSystemProducer.java:167)
Local Variable: java.lang.String#326561
Local Variable: java.lang.IllegalStateException#2
Local Variable: java.lang.String#330077
Local Variable: org.apache.samza.system.SystemProducerException#4
Local Variable: java.lang.Integer#15116
at
org.apache.samza.logging.log4j.StreamAppender.append(StreamAppender.java:115)
at
com.linkedin.atc.log4j.SafeStreamAppender.streamAppend(SafeStreamAppender.java:32)
at
com.linkedin.atc.log4j.SafeStreamAppender.append(SafeStreamAppender.java:23)
Local Variable: com.linkedin.atc.log4j.SafeStreamAppender#1
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
at
org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
Local Variable: org.apache.log4j.helpers.AppenderAttachableImpl#1
at org.apache.log4j.Category.callAppenders(Category.java:206)
Local Variable: org.apache.log4j.spi.LoggingEvent#24
Local Variable: org.apache.log4j.Logger#4
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.log(Category.java:856)
at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:323)
at
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.close(LiKafkaProducerImpl.java:313)
Local Variable: java.util.concurrent.TimeUnit$3#1
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:220)
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#4
Local Variable:
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#2
Local Variable: java.lang.Boolean#1
Local Variable: org.apache.samza.system.SystemProducerException#2
Local Variable: java.lang.Object#203455
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown
string>)
at
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#58
at
org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
Local Variable:
org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#58
Local Variable: org.apache.kafka.common.errors.TimeoutException#2
Local Variable: java.util.ArrayList$Itr#6
at
org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
Local Variable: java.util.ArrayList#263984
Local Variable:
org.apache.kafka.clients.producer.internals.RecordAccumulator#4
Local Variable: java.util.ArrayList$Itr#5
Local Variable:
org.apache.kafka.clients.producer.internals.RecordBatch#34
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
Local Variable: org.apache.kafka.common.Cluster#2
Local Variable: java.util.Collections$EmptyMap#1
Local Variable:
org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#2
Local Variable: java.util.HashMap$KeyIterator#2
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
Local Variable: org.apache.kafka.clients.producer.internals.Sender#4
at java.lang.Thread.run(Thread.java:745)
"kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon
prio=5 tid=35 BLOCKED
at org.apache.log4j.Category.callAppenders(Category.java:204)
Local Variable: org.apache.log4j.spi.LoggingEvent#26
Local Variable: org.apache.log4j.Logger#15
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.log(Category.java:856)
at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:304)
at
org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:87)
at
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:59)
Local Variable: javax.management.ObjectName#162
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
Local Variable: org.apache.kafka.common.network.SslChannelBuilder#7
Local Variable: java.util.ArrayList#264895
Local Variable:
org.apache.kafka.common.internals.ClusterResourceListeners#7
Local Variable: java.lang.String#308990
Local Variable:
org.apache.kafka.common.config.AbstractConfig$RecordingMap#8
Local Variable: java.util.LinkedHashMap#854
Local Variable: java.util.ArrayList#264889
Local Variable: org.apache.kafka.common.metrics.MetricConfig#7
Local Variable: java.util.ArrayList#264910
Local Variable: org.apache.kafka.clients.producer.ProducerConfig#2
Local Variable: org.apache.kafka.clients.NetworkClient#7
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#5
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#6
Local Variable:
org.apache.kafka.common.config.AbstractConfig$RecordingMap#6
Local Variable: org.apache.kafka.clients.producer.KafkaProducer#2
at
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#1
at
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
Local Variable: java.util.Properties#67
Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#8
Local Variable:
com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#2
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#2
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#14
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#13
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
Local Variable: org.apache.samza.system.SystemProducerException#1
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#5
Local Variable:
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#3
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown
string>)
at
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#27
at
org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
Local Variable:
org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#27
Local Variable: java.util.ArrayList$Itr#2
Local Variable: org.apache.kafka.common.errors.TimeoutException#1
at
org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
Local Variable:
org.apache.kafka.clients.producer.internals.RecordAccumulator#10
Local Variable: java.util.ArrayList$Itr#1
Local Variable: java.util.ArrayList#263305
Local Variable:
org.apache.kafka.clients.producer.internals.RecordBatch#21
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
Local Variable: org.apache.kafka.common.Cluster#1
Local Variable:
org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#1
Local Variable: java.util.HashMap$KeyIterator#1
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
Local Variable: org.apache.kafka.clients.producer.internals.Sender#7
at java.lang.Thread.run(Thread.java:745)
{noFormat}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)