[ 
https://issues.apache.org/jira/browse/SAMZA-1537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jake Maes updated SAMZA-1537:
-----------------------------
    Description: 
The thread dumps of the 2 offending threads are below, but the basics are: 
1. AppInfoParser in kafka uses static synchronized methods
2. Log4j synchronizes per Category

So if the StreamAppender tries create a new KafkaProducer, which calls the 
static sync AppInfoParser thread, which then tries to log to the same Category

{noFormat}
"kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon 
prio=5 tid=23 BLOCKED
        at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:57)
           Local Variable: java.lang.String#326563
           Local Variable: java.lang.String#329864
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
           Local Variable: 
org.apache.kafka.common.config.AbstractConfig$RecordingMap#12
           Local Variable: java.util.ArrayList#265184
           Local Variable: org.apache.kafka.common.metrics.MetricConfig#9
           Local Variable: java.util.LinkedHashMap#991
           Local Variable: 
org.apache.kafka.common.internals.ClusterResourceListeners#9
           Local Variable: java.util.ArrayList#265353
           Local Variable: org.apache.kafka.clients.NetworkClient#9
           Local Variable: org.apache.kafka.common.network.SslChannelBuilder#9
           Local Variable: java.util.ArrayList#265374
           Local Variable: org.apache.kafka.clients.producer.ProducerConfig#3
           Local Variable: java.lang.String#309971
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
           Local Variable: 
org.apache.kafka.common.config.AbstractConfig$RecordingMap#11
           Local Variable: org.apache.kafka.clients.producer.KafkaProducer#3
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#7
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#8
        at 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#2
        at 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#10
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#9
           Local Variable: java.util.Properties#38
           Local Variable: 
com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#3
           Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#9
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#3
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.send(SamzaRawLiKafkaSystemProducer.java:167)
           Local Variable: java.lang.String#326561
           Local Variable: java.lang.IllegalStateException#2
           Local Variable: java.lang.String#330077
           Local Variable: org.apache.samza.system.SystemProducerException#4
           Local Variable: java.lang.Integer#15116
        at 
org.apache.samza.logging.log4j.StreamAppender.append(StreamAppender.java:115)
        at 
com.linkedin.atc.log4j.SafeStreamAppender.streamAppend(SafeStreamAppender.java:32)
        at 
com.linkedin.atc.log4j.SafeStreamAppender.append(SafeStreamAppender.java:23)
           Local Variable: com.linkedin.atc.log4j.SafeStreamAppender#1
        at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
        at 
org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
           Local Variable: org.apache.log4j.helpers.AppenderAttachableImpl#1
        at org.apache.log4j.Category.callAppenders(Category.java:206)
           Local Variable: org.apache.log4j.spi.LoggingEvent#24
           Local Variable: org.apache.log4j.Logger#4
        at org.apache.log4j.Category.forcedLog(Category.java:391)
        at org.apache.log4j.Category.log(Category.java:856)
        at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:323)
        at 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.close(LiKafkaProducerImpl.java:313)
           Local Variable: java.util.concurrent.TimeUnit$3#1
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:220)
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#4
           Local Variable: 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#2
           Local Variable: java.lang.Boolean#1
           Local Variable: org.apache.samza.system.SystemProducerException#2
           Local Variable: java.lang.Object#203455
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown
 string>)
        at 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#58
        at 
org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#58
           Local Variable: org.apache.kafka.common.errors.TimeoutException#2
           Local Variable: java.util.ArrayList$Itr#6
        at 
org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
           Local Variable: java.util.ArrayList#263984
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordAccumulator#4
           Local Variable: java.util.ArrayList$Itr#5
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordBatch#34
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
           Local Variable: org.apache.kafka.common.Cluster#2
           Local Variable: java.util.Collections$EmptyMap#1
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#2
           Local Variable: java.util.HashMap$KeyIterator#2
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
           Local Variable: org.apache.kafka.clients.producer.internals.Sender#4
        at java.lang.Thread.run(Thread.java:745)


"kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon 
prio=5 tid=35 BLOCKED
        at org.apache.log4j.Category.callAppenders(Category.java:204)
           Local Variable: org.apache.log4j.spi.LoggingEvent#26
           Local Variable: org.apache.log4j.Logger#15
        at org.apache.log4j.Category.forcedLog(Category.java:391)
        at org.apache.log4j.Category.log(Category.java:856)
        at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:304)
        at 
org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:87)
        at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:59)
           Local Variable: javax.management.ObjectName#162
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
           Local Variable: org.apache.kafka.common.network.SslChannelBuilder#7
           Local Variable: java.util.ArrayList#264895
           Local Variable: 
org.apache.kafka.common.internals.ClusterResourceListeners#7
           Local Variable: java.lang.String#308990
           Local Variable: 
org.apache.kafka.common.config.AbstractConfig$RecordingMap#8
           Local Variable: java.util.LinkedHashMap#854
           Local Variable: java.util.ArrayList#264889
           Local Variable: org.apache.kafka.common.metrics.MetricConfig#7
           Local Variable: java.util.ArrayList#264910
           Local Variable: org.apache.kafka.clients.producer.ProducerConfig#2
           Local Variable: org.apache.kafka.clients.NetworkClient#7
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#5
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#6
           Local Variable: 
org.apache.kafka.common.config.AbstractConfig$RecordingMap#6
           Local Variable: org.apache.kafka.clients.producer.KafkaProducer#2
        at 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#1
        at 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
           Local Variable: java.util.Properties#67
           Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#8
           Local Variable: 
com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#2
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#2
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#14
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#13
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
           Local Variable: org.apache.samza.system.SystemProducerException#1
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#5
           Local Variable: 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#3
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown
 string>)
        at 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#27
        at 
org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#27
           Local Variable: java.util.ArrayList$Itr#2
           Local Variable: org.apache.kafka.common.errors.TimeoutException#1
        at 
org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordAccumulator#10
           Local Variable: java.util.ArrayList$Itr#1
           Local Variable: java.util.ArrayList#263305
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordBatch#21
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
           Local Variable: org.apache.kafka.common.Cluster#1
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#1
           Local Variable: java.util.HashMap$KeyIterator#1
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
           Local Variable: org.apache.kafka.clients.producer.internals.Sender#7
        at java.lang.Thread.run(Thread.java:745)
{noFormat}

After some discussion with [~pmaheshwari], we felt making the StreamAppender 
async was the only reliable solution. There are 2 approaches to this:
1. Use log4j2 which has async logging by default. The down side is that we'd 
have to update the StreamAppender and JmxAppender to be log4j2 plugins instead 
of simply extending AppenderSkeleton.
2. Add a queue and thread to StreamAppender s.t. new events are added to the 
queue with some timeout and the thread consumes from the queue and sends to the 
configured SystemProducer. 

2 is favorable right now because it's quicker to implement and test. It can 
also be easily replaced by option 1 which is already a goal because we want to 
leverage the performance benefits of log4j2. 

  was:
The thread dumps of the 2 offending threads are below, but the basics are: 
1. AppInfoParser in kafka uses static synchronized methods
2. Log4j synchronizes per Category

So if the StreamAppender tries create a new KafkaProducer, which calls the 
static sync AppInfoParser thread, which then tries to log to the same Category

{noFormat}
"kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon 
prio=5 tid=23 BLOCKED
        at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:57)
           Local Variable: java.lang.String#326563
           Local Variable: java.lang.String#329864
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
           Local Variable: 
org.apache.kafka.common.config.AbstractConfig$RecordingMap#12
           Local Variable: java.util.ArrayList#265184
           Local Variable: org.apache.kafka.common.metrics.MetricConfig#9
           Local Variable: java.util.LinkedHashMap#991
           Local Variable: 
org.apache.kafka.common.internals.ClusterResourceListeners#9
           Local Variable: java.util.ArrayList#265353
           Local Variable: org.apache.kafka.clients.NetworkClient#9
           Local Variable: org.apache.kafka.common.network.SslChannelBuilder#9
           Local Variable: java.util.ArrayList#265374
           Local Variable: org.apache.kafka.clients.producer.ProducerConfig#3
           Local Variable: java.lang.String#309971
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
           Local Variable: 
org.apache.kafka.common.config.AbstractConfig$RecordingMap#11
           Local Variable: org.apache.kafka.clients.producer.KafkaProducer#3
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#7
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#8
        at 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#2
        at 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#10
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#9
           Local Variable: java.util.Properties#38
           Local Variable: 
com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#3
           Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#9
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#3
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.send(SamzaRawLiKafkaSystemProducer.java:167)
           Local Variable: java.lang.String#326561
           Local Variable: java.lang.IllegalStateException#2
           Local Variable: java.lang.String#330077
           Local Variable: org.apache.samza.system.SystemProducerException#4
           Local Variable: java.lang.Integer#15116
        at 
org.apache.samza.logging.log4j.StreamAppender.append(StreamAppender.java:115)
        at 
com.linkedin.atc.log4j.SafeStreamAppender.streamAppend(SafeStreamAppender.java:32)
        at 
com.linkedin.atc.log4j.SafeStreamAppender.append(SafeStreamAppender.java:23)
           Local Variable: com.linkedin.atc.log4j.SafeStreamAppender#1
        at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
        at 
org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
           Local Variable: org.apache.log4j.helpers.AppenderAttachableImpl#1
        at org.apache.log4j.Category.callAppenders(Category.java:206)
           Local Variable: org.apache.log4j.spi.LoggingEvent#24
           Local Variable: org.apache.log4j.Logger#4
        at org.apache.log4j.Category.forcedLog(Category.java:391)
        at org.apache.log4j.Category.log(Category.java:856)
        at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:323)
        at 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.close(LiKafkaProducerImpl.java:313)
           Local Variable: java.util.concurrent.TimeUnit$3#1
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:220)
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#4
           Local Variable: 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#2
           Local Variable: java.lang.Boolean#1
           Local Variable: org.apache.samza.system.SystemProducerException#2
           Local Variable: java.lang.Object#203455
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown
 string>)
        at 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#58
        at 
org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#58
           Local Variable: org.apache.kafka.common.errors.TimeoutException#2
           Local Variable: java.util.ArrayList$Itr#6
        at 
org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
           Local Variable: java.util.ArrayList#263984
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordAccumulator#4
           Local Variable: java.util.ArrayList$Itr#5
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordBatch#34
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
           Local Variable: org.apache.kafka.common.Cluster#2
           Local Variable: java.util.Collections$EmptyMap#1
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#2
           Local Variable: java.util.HashMap$KeyIterator#2
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
           Local Variable: org.apache.kafka.clients.producer.internals.Sender#4
        at java.lang.Thread.run(Thread.java:745)


"kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon 
prio=5 tid=35 BLOCKED
        at org.apache.log4j.Category.callAppenders(Category.java:204)
           Local Variable: org.apache.log4j.spi.LoggingEvent#26
           Local Variable: org.apache.log4j.Logger#15
        at org.apache.log4j.Category.forcedLog(Category.java:391)
        at org.apache.log4j.Category.log(Category.java:856)
        at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:304)
        at 
org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:87)
        at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:59)
           Local Variable: javax.management.ObjectName#162
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
           Local Variable: org.apache.kafka.common.network.SslChannelBuilder#7
           Local Variable: java.util.ArrayList#264895
           Local Variable: 
org.apache.kafka.common.internals.ClusterResourceListeners#7
           Local Variable: java.lang.String#308990
           Local Variable: 
org.apache.kafka.common.config.AbstractConfig$RecordingMap#8
           Local Variable: java.util.LinkedHashMap#854
           Local Variable: java.util.ArrayList#264889
           Local Variable: org.apache.kafka.common.metrics.MetricConfig#7
           Local Variable: java.util.ArrayList#264910
           Local Variable: org.apache.kafka.clients.producer.ProducerConfig#2
           Local Variable: org.apache.kafka.clients.NetworkClient#7
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#5
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#6
           Local Variable: 
org.apache.kafka.common.config.AbstractConfig$RecordingMap#6
           Local Variable: org.apache.kafka.clients.producer.KafkaProducer#2
        at 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#1
        at 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
           Local Variable: java.util.Properties#67
           Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#8
           Local Variable: 
com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#2
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#2
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#14
           Local Variable: 
org.apache.kafka.common.serialization.ByteArraySerializer#13
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
           Local Variable: org.apache.samza.system.SystemProducerException#1
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#5
           Local Variable: 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#3
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
        at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown
 string>)
        at 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
           Local Variable: 
com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#27
        at 
org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#27
           Local Variable: java.util.ArrayList$Itr#2
           Local Variable: org.apache.kafka.common.errors.TimeoutException#1
        at 
org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordAccumulator#10
           Local Variable: java.util.ArrayList$Itr#1
           Local Variable: java.util.ArrayList#263305
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordBatch#21
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
           Local Variable: org.apache.kafka.common.Cluster#1
           Local Variable: 
org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#1
           Local Variable: java.util.HashMap$KeyIterator#1
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
           Local Variable: org.apache.kafka.clients.producer.internals.Sender#7
        at java.lang.Thread.run(Thread.java:745)
{noFormat}


> StreamAppender can deadlock due to locks held by Kafka and Log4j
> ----------------------------------------------------------------
>
>                 Key: SAMZA-1537
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1537
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Jake Maes
>            Assignee: Jake Maes
>
> The thread dumps of the 2 offending threads are below, but the basics are: 
> 1. AppInfoParser in kafka uses static synchronized methods
> 2. Log4j synchronizes per Category
> So if the StreamAppender tries create a new KafkaProducer, which calls the 
> static sync AppInfoParser thread, which then tries to log to the same Category
> {noFormat}
> "kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon 
> prio=5 tid=23 BLOCKED
>       at 
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:57)
>          Local Variable: java.lang.String#326563
>          Local Variable: java.lang.String#329864
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
>          Local Variable: 
> org.apache.kafka.common.config.AbstractConfig$RecordingMap#12
>          Local Variable: java.util.ArrayList#265184
>          Local Variable: org.apache.kafka.common.metrics.MetricConfig#9
>          Local Variable: java.util.LinkedHashMap#991
>          Local Variable: 
> org.apache.kafka.common.internals.ClusterResourceListeners#9
>          Local Variable: java.util.ArrayList#265353
>          Local Variable: org.apache.kafka.clients.NetworkClient#9
>          Local Variable: org.apache.kafka.common.network.SslChannelBuilder#9
>          Local Variable: java.util.ArrayList#265374
>          Local Variable: org.apache.kafka.clients.producer.ProducerConfig#3
>          Local Variable: java.lang.String#309971
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
>          Local Variable: 
> org.apache.kafka.common.config.AbstractConfig$RecordingMap#11
>          Local Variable: org.apache.kafka.clients.producer.KafkaProducer#3
>          Local Variable: 
> org.apache.kafka.common.serialization.ByteArraySerializer#7
>          Local Variable: 
> org.apache.kafka.common.serialization.ByteArraySerializer#8
>       at 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
>          Local Variable: 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#2
>       at 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
>          Local Variable: 
> org.apache.kafka.common.serialization.ByteArraySerializer#10
>          Local Variable: 
> org.apache.kafka.common.serialization.ByteArraySerializer#9
>          Local Variable: java.util.Properties#38
>          Local Variable: 
> com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#3
>          Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#9
>          Local Variable: 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#3
>       at 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
>       at 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
>       at 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.send(SamzaRawLiKafkaSystemProducer.java:167)
>          Local Variable: java.lang.String#326561
>          Local Variable: java.lang.IllegalStateException#2
>          Local Variable: java.lang.String#330077
>          Local Variable: org.apache.samza.system.SystemProducerException#4
>          Local Variable: java.lang.Integer#15116
>       at 
> org.apache.samza.logging.log4j.StreamAppender.append(StreamAppender.java:115)
>       at 
> com.linkedin.atc.log4j.SafeStreamAppender.streamAppend(SafeStreamAppender.java:32)
>       at 
> com.linkedin.atc.log4j.SafeStreamAppender.append(SafeStreamAppender.java:23)
>          Local Variable: com.linkedin.atc.log4j.SafeStreamAppender#1
>       at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
>       at 
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
>          Local Variable: org.apache.log4j.helpers.AppenderAttachableImpl#1
>       at org.apache.log4j.Category.callAppenders(Category.java:206)
>          Local Variable: org.apache.log4j.spi.LoggingEvent#24
>          Local Variable: org.apache.log4j.Logger#4
>       at org.apache.log4j.Category.forcedLog(Category.java:391)
>       at org.apache.log4j.Category.log(Category.java:856)
>       at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:323)
>       at 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.close(LiKafkaProducerImpl.java:313)
>          Local Variable: java.util.concurrent.TimeUnit$3#1
>       at 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:220)
>          Local Variable: 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#4
>          Local Variable: 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#2
>          Local Variable: java.lang.Boolean#1
>          Local Variable: org.apache.samza.system.SystemProducerException#2
>          Local Variable: java.lang.Object#203455
>       at 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
>       at 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown
>  string>)
>       at 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
>          Local Variable: 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#58
>       at 
> org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
>          Local Variable: 
> org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#58
>          Local Variable: org.apache.kafka.common.errors.TimeoutException#2
>          Local Variable: java.util.ArrayList$Itr#6
>       at 
> org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
>       at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
>          Local Variable: java.util.ArrayList#263984
>          Local Variable: 
> org.apache.kafka.clients.producer.internals.RecordAccumulator#4
>          Local Variable: java.util.ArrayList$Itr#5
>          Local Variable: 
> org.apache.kafka.clients.producer.internals.RecordBatch#34
>       at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
>          Local Variable: org.apache.kafka.common.Cluster#2
>          Local Variable: java.util.Collections$EmptyMap#1
>          Local Variable: 
> org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#2
>          Local Variable: java.util.HashMap$KeyIterator#2
>       at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
>          Local Variable: org.apache.kafka.clients.producer.internals.Sender#4
>       at java.lang.Thread.run(Thread.java:745)
> "kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon 
> prio=5 tid=35 BLOCKED
>       at org.apache.log4j.Category.callAppenders(Category.java:204)
>          Local Variable: org.apache.log4j.spi.LoggingEvent#26
>          Local Variable: org.apache.log4j.Logger#15
>       at org.apache.log4j.Category.forcedLog(Category.java:391)
>       at org.apache.log4j.Category.log(Category.java:856)
>       at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:304)
>       at 
> org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:87)
>       at 
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:59)
>          Local Variable: javax.management.ObjectName#162
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
>          Local Variable: org.apache.kafka.common.network.SslChannelBuilder#7
>          Local Variable: java.util.ArrayList#264895
>          Local Variable: 
> org.apache.kafka.common.internals.ClusterResourceListeners#7
>          Local Variable: java.lang.String#308990
>          Local Variable: 
> org.apache.kafka.common.config.AbstractConfig$RecordingMap#8
>          Local Variable: java.util.LinkedHashMap#854
>          Local Variable: java.util.ArrayList#264889
>          Local Variable: org.apache.kafka.common.metrics.MetricConfig#7
>          Local Variable: java.util.ArrayList#264910
>          Local Variable: org.apache.kafka.clients.producer.ProducerConfig#2
>          Local Variable: org.apache.kafka.clients.NetworkClient#7
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
>          Local Variable: 
> org.apache.kafka.common.serialization.ByteArraySerializer#5
>          Local Variable: 
> org.apache.kafka.common.serialization.ByteArraySerializer#6
>          Local Variable: 
> org.apache.kafka.common.config.AbstractConfig$RecordingMap#6
>          Local Variable: org.apache.kafka.clients.producer.KafkaProducer#2
>       at 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
>          Local Variable: 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#1
>       at 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
>          Local Variable: java.util.Properties#67
>          Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#8
>          Local Variable: 
> com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#2
>          Local Variable: 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#2
>          Local Variable: 
> org.apache.kafka.common.serialization.ByteArraySerializer#14
>          Local Variable: 
> org.apache.kafka.common.serialization.ByteArraySerializer#13
>       at 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
>       at 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
>          Local Variable: org.apache.samza.system.SystemProducerException#1
>          Local Variable: 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#5
>          Local Variable: 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#3
>       at 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
>       at 
> com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown
>  string>)
>       at 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
>          Local Variable: 
> com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#27
>       at 
> org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
>          Local Variable: 
> org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#27
>          Local Variable: java.util.ArrayList$Itr#2
>          Local Variable: org.apache.kafka.common.errors.TimeoutException#1
>       at 
> org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
>       at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
>          Local Variable: 
> org.apache.kafka.clients.producer.internals.RecordAccumulator#10
>          Local Variable: java.util.ArrayList$Itr#1
>          Local Variable: java.util.ArrayList#263305
>          Local Variable: 
> org.apache.kafka.clients.producer.internals.RecordBatch#21
>       at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
>          Local Variable: org.apache.kafka.common.Cluster#1
>          Local Variable: 
> org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#1
>          Local Variable: java.util.HashMap$KeyIterator#1
>       at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
>          Local Variable: org.apache.kafka.clients.producer.internals.Sender#7
>       at java.lang.Thread.run(Thread.java:745)
> {noFormat}
> After some discussion with [~pmaheshwari], we felt making the StreamAppender 
> async was the only reliable solution. There are 2 approaches to this:
> 1. Use log4j2 which has async logging by default. The down side is that we'd 
> have to update the StreamAppender and JmxAppender to be log4j2 plugins 
> instead of simply extending AppenderSkeleton.
> 2. Add a queue and thread to StreamAppender s.t. new events are added to the 
> queue with some timeout and the thread consumes from the queue and sends to 
> the configured SystemProducer. 
> 2 is favorable right now because it's quicker to implement and test. It can 
> also be easily replaced by option 1 which is already a goal because we want 
> to leverage the performance benefits of log4j2. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to