[
https://issues.apache.org/jira/browse/SAMZA-1537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jake Maes updated SAMZA-1537:
-----------------------------
Description:
The thread dumps of the 2 offending threads are below, but the basics are:
1. AppInfoParser in kafka uses static synchronized methods
2. Log4j synchronizes per Category
So if the StreamAppender tries create a new KafkaProducer, which calls the
static sync AppInfoParser thread, which then tries to log to the same Category
{noFormat}
"kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon
prio=5 tid=23 BLOCKED
at
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:57)
Local Variable: java.lang.String#326563
Local Variable: java.lang.String#329864
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
Local Variable:
org.apache.kafka.common.config.AbstractConfig$RecordingMap#12
Local Variable: java.util.ArrayList#265184
Local Variable: org.apache.kafka.common.metrics.MetricConfig#9
Local Variable: java.util.LinkedHashMap#991
Local Variable:
org.apache.kafka.common.internals.ClusterResourceListeners#9
Local Variable: java.util.ArrayList#265353
Local Variable: org.apache.kafka.clients.NetworkClient#9
Local Variable: org.apache.kafka.common.network.SslChannelBuilder#9
Local Variable: java.util.ArrayList#265374
Local Variable: org.apache.kafka.clients.producer.ProducerConfig#3
Local Variable: java.lang.String#309971
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
Local Variable:
org.apache.kafka.common.config.AbstractConfig$RecordingMap#11
Local Variable: org.apache.kafka.clients.producer.KafkaProducer#3
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#7
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#8
at
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#2
at
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#10
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#9
Local Variable: java.util.Properties#38
Local Variable:
com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#3
Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#9
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#3
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.send(SamzaRawLiKafkaSystemProducer.java:167)
Local Variable: java.lang.String#326561
Local Variable: java.lang.IllegalStateException#2
Local Variable: java.lang.String#330077
Local Variable: org.apache.samza.system.SystemProducerException#4
Local Variable: java.lang.Integer#15116
at
org.apache.samza.logging.log4j.StreamAppender.append(StreamAppender.java:115)
at
com.linkedin.atc.log4j.SafeStreamAppender.streamAppend(SafeStreamAppender.java:32)
at
com.linkedin.atc.log4j.SafeStreamAppender.append(SafeStreamAppender.java:23)
Local Variable: com.linkedin.atc.log4j.SafeStreamAppender#1
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
at
org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
Local Variable: org.apache.log4j.helpers.AppenderAttachableImpl#1
at org.apache.log4j.Category.callAppenders(Category.java:206)
Local Variable: org.apache.log4j.spi.LoggingEvent#24
Local Variable: org.apache.log4j.Logger#4
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.log(Category.java:856)
at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:323)
at
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.close(LiKafkaProducerImpl.java:313)
Local Variable: java.util.concurrent.TimeUnit$3#1
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:220)
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#4
Local Variable:
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#2
Local Variable: java.lang.Boolean#1
Local Variable: org.apache.samza.system.SystemProducerException#2
Local Variable: java.lang.Object#203455
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown
string>)
at
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#58
at
org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
Local Variable:
org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#58
Local Variable: org.apache.kafka.common.errors.TimeoutException#2
Local Variable: java.util.ArrayList$Itr#6
at
org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
Local Variable: java.util.ArrayList#263984
Local Variable:
org.apache.kafka.clients.producer.internals.RecordAccumulator#4
Local Variable: java.util.ArrayList$Itr#5
Local Variable:
org.apache.kafka.clients.producer.internals.RecordBatch#34
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
Local Variable: org.apache.kafka.common.Cluster#2
Local Variable: java.util.Collections$EmptyMap#1
Local Variable:
org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#2
Local Variable: java.util.HashMap$KeyIterator#2
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
Local Variable: org.apache.kafka.clients.producer.internals.Sender#4
at java.lang.Thread.run(Thread.java:745)
"kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon
prio=5 tid=35 BLOCKED
at org.apache.log4j.Category.callAppenders(Category.java:204)
Local Variable: org.apache.log4j.spi.LoggingEvent#26
Local Variable: org.apache.log4j.Logger#15
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.log(Category.java:856)
at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:304)
at
org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:87)
at
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:59)
Local Variable: javax.management.ObjectName#162
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
Local Variable: org.apache.kafka.common.network.SslChannelBuilder#7
Local Variable: java.util.ArrayList#264895
Local Variable:
org.apache.kafka.common.internals.ClusterResourceListeners#7
Local Variable: java.lang.String#308990
Local Variable:
org.apache.kafka.common.config.AbstractConfig$RecordingMap#8
Local Variable: java.util.LinkedHashMap#854
Local Variable: java.util.ArrayList#264889
Local Variable: org.apache.kafka.common.metrics.MetricConfig#7
Local Variable: java.util.ArrayList#264910
Local Variable: org.apache.kafka.clients.producer.ProducerConfig#2
Local Variable: org.apache.kafka.clients.NetworkClient#7
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#5
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#6
Local Variable:
org.apache.kafka.common.config.AbstractConfig$RecordingMap#6
Local Variable: org.apache.kafka.clients.producer.KafkaProducer#2
at
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#1
at
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
Local Variable: java.util.Properties#67
Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#8
Local Variable:
com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#2
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#2
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#14
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#13
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
Local Variable: org.apache.samza.system.SystemProducerException#1
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#5
Local Variable:
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#3
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown
string>)
at
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#27
at
org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
Local Variable:
org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#27
Local Variable: java.util.ArrayList$Itr#2
Local Variable: org.apache.kafka.common.errors.TimeoutException#1
at
org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
Local Variable:
org.apache.kafka.clients.producer.internals.RecordAccumulator#10
Local Variable: java.util.ArrayList$Itr#1
Local Variable: java.util.ArrayList#263305
Local Variable:
org.apache.kafka.clients.producer.internals.RecordBatch#21
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
Local Variable: org.apache.kafka.common.Cluster#1
Local Variable:
org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#1
Local Variable: java.util.HashMap$KeyIterator#1
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
Local Variable: org.apache.kafka.clients.producer.internals.Sender#7
at java.lang.Thread.run(Thread.java:745)
{noFormat}
After some discussion with [~pmaheshwari], we felt making the StreamAppender
async was the only reliable solution. There are 2 approaches to this:
1. Use log4j2 which has async logging by default. The down side is that we'd
have to update the StreamAppender and JmxAppender to be log4j2 plugins instead
of simply extending AppenderSkeleton.
2. Add a queue and thread to StreamAppender s.t. new events are added to the
queue with some timeout and the thread consumes from the queue and sends to the
configured SystemProducer.
2 is favorable right now because it's quicker to implement and test. It can
also be easily replaced by option 1 which is already a goal because we want to
leverage the performance benefits of log4j2.
was:
The thread dumps of the 2 offending threads are below, but the basics are:
1. AppInfoParser in kafka uses static synchronized methods
2. Log4j synchronizes per Category
So if the StreamAppender tries create a new KafkaProducer, which calls the
static sync AppInfoParser thread, which then tries to log to the same Category
{noFormat}
"kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon
prio=5 tid=23 BLOCKED
at
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:57)
Local Variable: java.lang.String#326563
Local Variable: java.lang.String#329864
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
Local Variable:
org.apache.kafka.common.config.AbstractConfig$RecordingMap#12
Local Variable: java.util.ArrayList#265184
Local Variable: org.apache.kafka.common.metrics.MetricConfig#9
Local Variable: java.util.LinkedHashMap#991
Local Variable:
org.apache.kafka.common.internals.ClusterResourceListeners#9
Local Variable: java.util.ArrayList#265353
Local Variable: org.apache.kafka.clients.NetworkClient#9
Local Variable: org.apache.kafka.common.network.SslChannelBuilder#9
Local Variable: java.util.ArrayList#265374
Local Variable: org.apache.kafka.clients.producer.ProducerConfig#3
Local Variable: java.lang.String#309971
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
Local Variable:
org.apache.kafka.common.config.AbstractConfig$RecordingMap#11
Local Variable: org.apache.kafka.clients.producer.KafkaProducer#3
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#7
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#8
at
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#2
at
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#10
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#9
Local Variable: java.util.Properties#38
Local Variable:
com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#3
Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#9
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#3
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.send(SamzaRawLiKafkaSystemProducer.java:167)
Local Variable: java.lang.String#326561
Local Variable: java.lang.IllegalStateException#2
Local Variable: java.lang.String#330077
Local Variable: org.apache.samza.system.SystemProducerException#4
Local Variable: java.lang.Integer#15116
at
org.apache.samza.logging.log4j.StreamAppender.append(StreamAppender.java:115)
at
com.linkedin.atc.log4j.SafeStreamAppender.streamAppend(SafeStreamAppender.java:32)
at
com.linkedin.atc.log4j.SafeStreamAppender.append(SafeStreamAppender.java:23)
Local Variable: com.linkedin.atc.log4j.SafeStreamAppender#1
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
at
org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
Local Variable: org.apache.log4j.helpers.AppenderAttachableImpl#1
at org.apache.log4j.Category.callAppenders(Category.java:206)
Local Variable: org.apache.log4j.spi.LoggingEvent#24
Local Variable: org.apache.log4j.Logger#4
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.log(Category.java:856)
at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:323)
at
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.close(LiKafkaProducerImpl.java:313)
Local Variable: java.util.concurrent.TimeUnit$3#1
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:220)
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#4
Local Variable:
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#2
Local Variable: java.lang.Boolean#1
Local Variable: org.apache.samza.system.SystemProducerException#2
Local Variable: java.lang.Object#203455
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown
string>)
at
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#58
at
org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
Local Variable:
org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#58
Local Variable: org.apache.kafka.common.errors.TimeoutException#2
Local Variable: java.util.ArrayList$Itr#6
at
org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
Local Variable: java.util.ArrayList#263984
Local Variable:
org.apache.kafka.clients.producer.internals.RecordAccumulator#4
Local Variable: java.util.ArrayList$Itr#5
Local Variable:
org.apache.kafka.clients.producer.internals.RecordBatch#34
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
Local Variable: org.apache.kafka.common.Cluster#2
Local Variable: java.util.Collections$EmptyMap#1
Local Variable:
org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#2
Local Variable: java.util.HashMap$KeyIterator#2
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
Local Variable: org.apache.kafka.clients.producer.internals.Sender#4
at java.lang.Thread.run(Thread.java:745)
"kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon
prio=5 tid=35 BLOCKED
at org.apache.log4j.Category.callAppenders(Category.java:204)
Local Variable: org.apache.log4j.spi.LoggingEvent#26
Local Variable: org.apache.log4j.Logger#15
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.log(Category.java:856)
at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:304)
at
org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:87)
at
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:59)
Local Variable: javax.management.ObjectName#162
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
Local Variable: org.apache.kafka.common.network.SslChannelBuilder#7
Local Variable: java.util.ArrayList#264895
Local Variable:
org.apache.kafka.common.internals.ClusterResourceListeners#7
Local Variable: java.lang.String#308990
Local Variable:
org.apache.kafka.common.config.AbstractConfig$RecordingMap#8
Local Variable: java.util.LinkedHashMap#854
Local Variable: java.util.ArrayList#264889
Local Variable: org.apache.kafka.common.metrics.MetricConfig#7
Local Variable: java.util.ArrayList#264910
Local Variable: org.apache.kafka.clients.producer.ProducerConfig#2
Local Variable: org.apache.kafka.clients.NetworkClient#7
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#5
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#6
Local Variable:
org.apache.kafka.common.config.AbstractConfig$RecordingMap#6
Local Variable: org.apache.kafka.clients.producer.KafkaProducer#2
at
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#1
at
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
Local Variable: java.util.Properties#67
Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#8
Local Variable:
com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#2
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#2
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#14
Local Variable:
org.apache.kafka.common.serialization.ByteArraySerializer#13
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
Local Variable: org.apache.samza.system.SystemProducerException#1
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#5
Local Variable:
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#3
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
at
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown
string>)
at
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
Local Variable:
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#27
at
org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
Local Variable:
org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#27
Local Variable: java.util.ArrayList$Itr#2
Local Variable: org.apache.kafka.common.errors.TimeoutException#1
at
org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
Local Variable:
org.apache.kafka.clients.producer.internals.RecordAccumulator#10
Local Variable: java.util.ArrayList$Itr#1
Local Variable: java.util.ArrayList#263305
Local Variable:
org.apache.kafka.clients.producer.internals.RecordBatch#21
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
Local Variable: org.apache.kafka.common.Cluster#1
Local Variable:
org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#1
Local Variable: java.util.HashMap$KeyIterator#1
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
Local Variable: org.apache.kafka.clients.producer.internals.Sender#7
at java.lang.Thread.run(Thread.java:745)
{noFormat}
> StreamAppender can deadlock due to locks held by Kafka and Log4j
> ----------------------------------------------------------------
>
> Key: SAMZA-1537
> URL: https://issues.apache.org/jira/browse/SAMZA-1537
> Project: Samza
> Issue Type: Bug
> Reporter: Jake Maes
> Assignee: Jake Maes
>
> The thread dumps of the 2 offending threads are below, but the basics are:
> 1. AppInfoParser in kafka uses static synchronized methods
> 2. Log4j synchronizes per Category
> So if the StreamAppender tries create a new KafkaProducer, which calls the
> static sync AppInfoParser thread, which then tries to log to the same Category
> {noFormat}
> "kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon
> prio=5 tid=23 BLOCKED
> at
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:57)
> Local Variable: java.lang.String#326563
> Local Variable: java.lang.String#329864
> at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
> Local Variable:
> org.apache.kafka.common.config.AbstractConfig$RecordingMap#12
> Local Variable: java.util.ArrayList#265184
> Local Variable: org.apache.kafka.common.metrics.MetricConfig#9
> Local Variable: java.util.LinkedHashMap#991
> Local Variable:
> org.apache.kafka.common.internals.ClusterResourceListeners#9
> Local Variable: java.util.ArrayList#265353
> Local Variable: org.apache.kafka.clients.NetworkClient#9
> Local Variable: org.apache.kafka.common.network.SslChannelBuilder#9
> Local Variable: java.util.ArrayList#265374
> Local Variable: org.apache.kafka.clients.producer.ProducerConfig#3
> Local Variable: java.lang.String#309971
> at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
> Local Variable:
> org.apache.kafka.common.config.AbstractConfig$RecordingMap#11
> Local Variable: org.apache.kafka.clients.producer.KafkaProducer#3
> Local Variable:
> org.apache.kafka.common.serialization.ByteArraySerializer#7
> Local Variable:
> org.apache.kafka.common.serialization.ByteArraySerializer#8
> at
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
> Local Variable:
> com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#2
> at
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
> Local Variable:
> org.apache.kafka.common.serialization.ByteArraySerializer#10
> Local Variable:
> org.apache.kafka.common.serialization.ByteArraySerializer#9
> Local Variable: java.util.Properties#38
> Local Variable:
> com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#3
> Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#9
> Local Variable:
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#3
> at
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
> at
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
> at
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.send(SamzaRawLiKafkaSystemProducer.java:167)
> Local Variable: java.lang.String#326561
> Local Variable: java.lang.IllegalStateException#2
> Local Variable: java.lang.String#330077
> Local Variable: org.apache.samza.system.SystemProducerException#4
> Local Variable: java.lang.Integer#15116
> at
> org.apache.samza.logging.log4j.StreamAppender.append(StreamAppender.java:115)
> at
> com.linkedin.atc.log4j.SafeStreamAppender.streamAppend(SafeStreamAppender.java:32)
> at
> com.linkedin.atc.log4j.SafeStreamAppender.append(SafeStreamAppender.java:23)
> Local Variable: com.linkedin.atc.log4j.SafeStreamAppender#1
> at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
> at
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
> Local Variable: org.apache.log4j.helpers.AppenderAttachableImpl#1
> at org.apache.log4j.Category.callAppenders(Category.java:206)
> Local Variable: org.apache.log4j.spi.LoggingEvent#24
> Local Variable: org.apache.log4j.Logger#4
> at org.apache.log4j.Category.forcedLog(Category.java:391)
> at org.apache.log4j.Category.log(Category.java:856)
> at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:323)
> at
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.close(LiKafkaProducerImpl.java:313)
> Local Variable: java.util.concurrent.TimeUnit$3#1
> at
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:220)
> Local Variable:
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#4
> Local Variable:
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#2
> Local Variable: java.lang.Boolean#1
> Local Variable: org.apache.samza.system.SystemProducerException#2
> Local Variable: java.lang.Object#203455
> at
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
> at
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown
> string>)
> at
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
> Local Variable:
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#58
> at
> org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
> Local Variable:
> org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#58
> Local Variable: org.apache.kafka.common.errors.TimeoutException#2
> Local Variable: java.util.ArrayList$Itr#6
> at
> org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
> at
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
> Local Variable: java.util.ArrayList#263984
> Local Variable:
> org.apache.kafka.clients.producer.internals.RecordAccumulator#4
> Local Variable: java.util.ArrayList$Itr#5
> Local Variable:
> org.apache.kafka.clients.producer.internals.RecordBatch#34
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
> Local Variable: org.apache.kafka.common.Cluster#2
> Local Variable: java.util.Collections$EmptyMap#1
> Local Variable:
> org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#2
> Local Variable: java.util.HashMap$KeyIterator#2
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
> Local Variable: org.apache.kafka.clients.producer.internals.Sender#4
> at java.lang.Thread.run(Thread.java:745)
> "kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon
> prio=5 tid=35 BLOCKED
> at org.apache.log4j.Category.callAppenders(Category.java:204)
> Local Variable: org.apache.log4j.spi.LoggingEvent#26
> Local Variable: org.apache.log4j.Logger#15
> at org.apache.log4j.Category.forcedLog(Category.java:391)
> at org.apache.log4j.Category.log(Category.java:856)
> at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:304)
> at
> org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:87)
> at
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:59)
> Local Variable: javax.management.ObjectName#162
> at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
> Local Variable: org.apache.kafka.common.network.SslChannelBuilder#7
> Local Variable: java.util.ArrayList#264895
> Local Variable:
> org.apache.kafka.common.internals.ClusterResourceListeners#7
> Local Variable: java.lang.String#308990
> Local Variable:
> org.apache.kafka.common.config.AbstractConfig$RecordingMap#8
> Local Variable: java.util.LinkedHashMap#854
> Local Variable: java.util.ArrayList#264889
> Local Variable: org.apache.kafka.common.metrics.MetricConfig#7
> Local Variable: java.util.ArrayList#264910
> Local Variable: org.apache.kafka.clients.producer.ProducerConfig#2
> Local Variable: org.apache.kafka.clients.NetworkClient#7
> at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
> Local Variable:
> org.apache.kafka.common.serialization.ByteArraySerializer#5
> Local Variable:
> org.apache.kafka.common.serialization.ByteArraySerializer#6
> Local Variable:
> org.apache.kafka.common.config.AbstractConfig$RecordingMap#6
> Local Variable: org.apache.kafka.clients.producer.KafkaProducer#2
> at
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
> Local Variable:
> com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#1
> at
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
> Local Variable: java.util.Properties#67
> Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#8
> Local Variable:
> com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#2
> Local Variable:
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#2
> Local Variable:
> org.apache.kafka.common.serialization.ByteArraySerializer#14
> Local Variable:
> org.apache.kafka.common.serialization.ByteArraySerializer#13
> at
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
> at
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
> Local Variable: org.apache.samza.system.SystemProducerException#1
> Local Variable:
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#5
> Local Variable:
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#3
> at
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
> at
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown
> string>)
> at
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
> Local Variable:
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#27
> at
> org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
> Local Variable:
> org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#27
> Local Variable: java.util.ArrayList$Itr#2
> Local Variable: org.apache.kafka.common.errors.TimeoutException#1
> at
> org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
> at
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
> Local Variable:
> org.apache.kafka.clients.producer.internals.RecordAccumulator#10
> Local Variable: java.util.ArrayList$Itr#1
> Local Variable: java.util.ArrayList#263305
> Local Variable:
> org.apache.kafka.clients.producer.internals.RecordBatch#21
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
> Local Variable: org.apache.kafka.common.Cluster#1
> Local Variable:
> org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#1
> Local Variable: java.util.HashMap$KeyIterator#1
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
> Local Variable: org.apache.kafka.clients.producer.internals.Sender#7
> at java.lang.Thread.run(Thread.java:745)
> {noFormat}
> After some discussion with [~pmaheshwari], we felt making the StreamAppender
> async was the only reliable solution. There are 2 approaches to this:
> 1. Use log4j2 which has async logging by default. The down side is that we'd
> have to update the StreamAppender and JmxAppender to be log4j2 plugins
> instead of simply extending AppenderSkeleton.
> 2. Add a queue and thread to StreamAppender s.t. new events are added to the
> queue with some timeout and the thread consumes from the queue and sends to
> the configured SystemProducer.
> 2 is favorable right now because it's quicker to implement and test. It can
> also be easily replaced by option 1 which is already a goal because we want
> to leverage the performance benefits of log4j2.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)