[ 
https://issues.apache.org/jira/browse/SAMZA-1371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155729#comment-16155729
 ] 

Hao Song commented on SAMZA-1371:
---------------------------------

Full thread dump OpenJDK 64-Bit Server VM (25.141-b16 mixed mode):

"Attach Listener" #40 daemon prio=9 os_prio=0 tid=0x00007f97a81bf800 nid=0x2bed 
waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
        - None

"StatsD-pool-1-thread-1" #36 daemon prio=5 os_prio=0 tid=0x00007f97a8271800 
nid=0x12fe waiting on condition [0x00007f979e220000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000ba59a7b0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <kafka-broker-dns>:9092 for 
client samza_consumer-<App name>" #35 daemon prio=5 os_prio=0 
tid=0x00007f97c6116000 nid=0xf32 runnable [0x00007f979e322000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.Net.poll(Native Method)
        at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:954)
        - locked <0x00000000ba5d4e50> (a java.lang.Object)
        at 
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:204)
        - locked <0x00000000ba5d4ea0> (a java.lang.Object)
        at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
        - locked <0x00000000ba5d4ee0> (a 
sun.nio.ch.SocketAdaptor$SocketInputStream)
        at 
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
        - locked <0x00000000ba5d4e60> (a java.lang.Object)
        at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81)
        at 
kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86)
        at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
        - locked <0x00000000ba5d5098> (a java.lang.Object)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130)
        at 
org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:48)
        at 
org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:41)
        at 
org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:179)
        at 
org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:147)
        at 
org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:134)
        at 
org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
        at 
org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:133)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <kafka-broker-dns>:9092 for 
client samza_consumer-<App name>" #34 daemon prio=5 os_prio=0 
tid=0x00007f97c587f000 nid=0xf31 runnable [0x00007f979ea25000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.Net.poll(Native Method)
        at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:954)
        - locked <0x00000000ba639bb8> (a java.lang.Object)
        at 
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:204)
        - locked <0x00000000ba639b88> (a java.lang.Object)
        at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
        - locked <0x00000000ba637ab0> (a 
sun.nio.ch.SocketAdaptor$SocketInputStream)
        at 
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
        - locked <0x00000000ba639ae0> (a java.lang.Object)
        at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81)
        at 
kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86)
        at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
        - locked <0x00000000ba637910> (a java.lang.Object)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130)
        at 
org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:48)
        at 
org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:41)
        at 
org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:179)
        at 
org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:147)
        at 
org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:134)
        at 
org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
        at 
org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:133)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <kafka-broker-dns>:9092 for 
client samza_consumer<AppName>" #33 daemon prio=5 os_prio=0 
tid=0x00007f97c651b000 nid=0xf30 runnable [0x00007f979e924000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.Net.poll(Native Method)
        at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:954)
        - locked <0x00000000ba6499f8> (a java.lang.Object)
        at 
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:204)
        - locked <0x00000000ba649a48> (a java.lang.Object)
        at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
        - locked <0x00000000ba649a88> (a 
sun.nio.ch.SocketAdaptor$SocketInputStream)
        at 
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
        - locked <0x00000000ba649a08> (a java.lang.Object)
        at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81)
        at 
kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86)
        at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
        - locked <0x00000000ba649c40> (a java.lang.Object)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130)
        at 
org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:48)
        at 
org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:41)
        at 
org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:179)
        at 
org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:147)
        at 
org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:134)
        at 
org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
        at 
org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:133)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"kafka-producer-network-thread | 
samza_producer-shadow_ListingAlertsEmailOrganizerInstant-1" #32 daemon prio=5 
os_prio=0 tid=0x00007f97c613e000 nid=0xf16 runnable [0x00007f97a0747000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        - locked <0x00000000ba59aaa8> (a sun.nio.ch.Util$3)
        - locked <0x00000000ba59aab8> (a java.util.Collections$UnmodifiableSet)
        - locked <0x00000000ba59aa60> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.kafka.common.network.Selector.select(Selector.java:454)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:277)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"kafka-producer-network-thread | 
samza_producer-shadow_ListingAlertsEmailOrganizerInstant-1" #31 daemon prio=5 
os_prio=0 tid=0x00007f97b0353000 nid=0xe98 runnable [0x00007f979e823000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        - locked <0x00000000ba60f958> (a sun.nio.ch.Util$3)
        - locked <0x00000000ba60f968> (a java.util.Collections$UnmodifiableSet)
        - locked <0x00000000ba60f910> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.kafka.common.network.Selector.select(Selector.java:454)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:277)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"SAMZA-METRIC-SNAPSHOT-REPORTER" #27 daemon prio=5 os_prio=0 
tid=0x00007f97c57e5000 nid=0xe79 waiting on condition [0x00007f979eb26000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000ba649ef8> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"SAMZA-StatsdMetricsReporter" #26 daemon prio=5 os_prio=0 
tid=0x00007f97c57e0800 nid=0xe78 waiting on condition [0x00007f979ec27000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000ba5d57f8> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"SAMZA-JVM-METRICS" #25 daemon prio=5 os_prio=0 tid=0x00007f97c57df000 
nid=0xe76 waiting on condition [0x00007f979ed28000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000ba5d5a58> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"metrics-meter-tick-thread-2" #24 daemon prio=5 os_prio=0 
tid=0x00007f97c553d800 nid=0xe46 waiting on condition [0x00007f979f92a000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000ba64a158> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"metrics-meter-tick-thread-1" #23 daemon prio=5 os_prio=0 
tid=0x00007f97c553a800 nid=0xe45 waiting on condition [0x00007f979fa2b000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000ba64a158> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"RMI Scheduler(0)" #21 daemon prio=5 os_prio=0 tid=0x00007f97c5551800 nid=0xde1 
waiting on condition [0x00007f979fd2c000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000c14bfb70> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"RMI RenewClean-[<Samza host dns>:45477]" #19 daemon prio=5 os_prio=0 
tid=0x00007f97b0156000 nid=0xddb in Object.wait() [0x00007f979ff2e000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
        - locked <0x00000000ba5d5c98> (a java.lang.ref.ReferenceQueue$Lock)
        at 
sun.rmi.transport.DGCClient$EndpointEntry$RenewCleanThread.run(DGCClient.java:563)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"GC Daemon" #17 daemon prio=2 os_prio=0 tid=0x00007f97c5389000 nid=0xdab in 
Object.wait() [0x00007f97a0143000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x00000000c14c9fe0> (a sun.misc.GC$LatencyLock)
        at sun.misc.GC$Daemon.run(GC.java:117)
        - locked <0x00000000c14c9fe0> (a sun.misc.GC$LatencyLock)

   Locked ownable synchronizers:
        - None

"RMI Reaper" #16 prio=5 os_prio=0 tid=0x00007f97c5387000 nid=0xda9 in 
Object.wait() [0x00007f97a0244000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x00000000c14be9b8> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
        - locked <0x00000000c14be9b8> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
        at sun.rmi.transport.ObjectTable$Reaper.run(ObjectTable.java:351)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"RMI TCP Accept-45477" #15 daemon prio=5 os_prio=0 tid=0x00007f97c5368000 
nid=0xda4 runnable [0x00007f97a0345000]
   java.lang.Thread.State: RUNNABLE
        at java.net.PlainSocketImpl.socketAccept(Native Method)
        at 
java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
        at java.net.ServerSocket.implAccept(ServerSocket.java:545)
        at java.net.ServerSocket.accept(ServerSocket.java:513)
        at 
sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:400)
        at 
sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:372)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"RMI TCP Accept-0" #14 daemon prio=5 os_prio=0 tid=0x00007f97c5363800 nid=0xda2 
runnable [0x00007f97a0446000]
   java.lang.Thread.State: RUNNABLE
        at java.net.PlainSocketImpl.socketAccept(Native Method)
        at 
java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
        at java.net.ServerSocket.implAccept(ServerSocket.java:545)
        at java.net.ServerSocket.accept(ServerSocket.java:513)
        at 
sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:400)
        at 
sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:372)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"AsyncAppender-Dispatcher-Thread-1" #12 daemon prio=5 os_prio=0 
tid=0x00007f97c5194800 nid=0xd5a in Object.wait() [0x00007f97a1093000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at java.lang.Object.wait(Object.java:502)
        at org.apache.log4j.AsyncAppender$Dispatcher.run(AsyncAppender.java:548)
        - locked <0x00000000c1126828> (a java.util.ArrayList)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"Service Thread" #10 daemon prio=9 os_prio=0 tid=0x00007f97c4168800 nid=0xd12 
runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
        - None

"C1 CompilerThread3" #9 daemon prio=9 os_prio=0 tid=0x00007f97c4163800 
nid=0xd11 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
        - None

"C2 CompilerThread2" #8 daemon prio=9 os_prio=0 tid=0x00007f97c4161800 
nid=0xd08 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
        - None

"C2 CompilerThread1" #7 daemon prio=9 os_prio=0 tid=0x00007f97c415f000 
nid=0xd07 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
        - None

"C2 CompilerThread0" #6 daemon prio=9 os_prio=0 tid=0x00007f97c415d000 
nid=0xd06 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
        - None

"Signal Dispatcher" #5 daemon prio=9 os_prio=0 tid=0x00007f97c4150800 nid=0xcfd 
runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
        - None

"Surrogate Locker Thread (Concurrent GC)" #4 daemon prio=9 os_prio=0 
tid=0x00007f97c414e800 nid=0xcfc waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
        - None

"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007f97c4124800 nid=0xcae in 
Object.wait() [0x00007f97ad30e000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
        - locked <0x00000000c0aa7fc8> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
        at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

   Locked ownable synchronizers:
        - None

"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007f97c4120000 
nid=0xca0 in Object.wait() [0x00007f97ad40f000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at java.lang.Object.wait(Object.java:502)
        at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
        - locked <0x00000000c0aa6568> (a java.lang.ref.Reference$Lock)
        at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

   Locked ownable synchronizers:
        - None

"main" #1 prio=5 os_prio=0 tid=0x00007f97c4026800 nid=0xc58 runnable 
[0x00007f97cd560000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000ba59aee0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at 
java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
        at 
org.apache.samza.util.BlockingEnvelopeMap.poll(BlockingEnvelopeMap.java:140)
        at 
org.apache.samza.system.SystemStreamPartitionIterator.refresh(SystemStreamPartitionIterator.java:87)
        at 
org.apache.samza.system.SystemStreamPartitionIterator.hasNext(SystemStreamPartitionIterator.java:58)
        at 
scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at 
org.apache.samza.storage.kv.KeyValueStorageEngine.restore(KeyValueStorageEngine.scala:104)
        at 
org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:187)
        at 
org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:181)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.immutable.Map$Map3.foreach(Map.scala:154)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at 
org.apache.samza.storage.TaskStorageManager.restoreStores(TaskStorageManager.scala:181)
        at 
org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:76)
        at 
org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:107)
        at 
org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:830)
        at 
org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:828)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at 
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
        at 
org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:828)
        at 
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:719)
        at 
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:122)
        at 
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:89)
        at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)

   Locked ownable synchronizers:
        - None

"VM Thread" os_prio=0 tid=0x00007f97c4116000 nid=0xc81 runnable 

"Gang worker#0 (Parallel GC Threads)" os_prio=0 tid=0x00007f97c4037800 
nid=0xc59 runnable 

"Gang worker#1 (Parallel GC Threads)" os_prio=0 tid=0x00007f97c4039000 
nid=0xc5a runnable 

"Gang worker#2 (Parallel GC Threads)" os_prio=0 tid=0x00007f97c403b000 
nid=0xc5c runnable 

"Gang worker#3 (Parallel GC Threads)" os_prio=0 tid=0x00007f97c403c800 
nid=0xc5e runnable 

"Gang worker#4 (Parallel GC Threads)" os_prio=0 tid=0x00007f97c403e800 
nid=0xc5f runnable 

"Gang worker#5 (Parallel GC Threads)" os_prio=0 tid=0x00007f97c4040800 
nid=0xc60 runnable 

"Gang worker#6 (Parallel GC Threads)" os_prio=0 tid=0x00007f97c4042000 
nid=0xc61 runnable 

"Gang worker#7 (Parallel GC Threads)" os_prio=0 tid=0x00007f97c4044000 
nid=0xc62 runnable 

"Concurrent Mark-Sweep GC Thread" os_prio=0 tid=0x00007f97c40b5000 nid=0xc6b 
runnable 

"Gang worker#0 (Parallel CMS Threads)" os_prio=0 tid=0x00007f97c40b1000 
nid=0xc69 runnable 

"Gang worker#1 (Parallel CMS Threads)" os_prio=0 tid=0x00007f97c40b3000 
nid=0xc6a runnable 

"VM Periodic Task Thread" os_prio=0 tid=0x00007f97c416b000 nid=0xd14 waiting on 
condition 

JNI global references: 313


> Some Samza Containers get stuck at "Starting BrokerProxy for 
> hostname:portnum" while others seem to be fine
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: SAMZA-1371
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1371
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.11.0, 0.12.0
>         Environment: Samza version: 0.11, 0.12
> Kafka version: 0.11.0.0
>            Reporter: Ak Ka
>            Assignee: Yi Pan (Data Infrastructure)
>            Priority: Blocker
>
> We have multiple Samza apps using local store that have this issue. Some 
> containers get stuck on "Starting BrokerProxy for hostname:portnum" while 
> others seem to work as expected.  
> Here is the log:
> stuck:
> ```
> [...]
> 2017-07-25 17:11:26.546 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] Creating new SimpleConsumer for host hostname:portnum for system kafka
> 2017-07-25 17:11:26.547 [main] org.apache.samza.system.kafka.GetOffset [INFO] 
> Validating offset 0 for topic and partition 
> [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,2]
> 2017-07-25 17:11:26.648 [main] org.apache.samza.system.kafka.GetOffset [INFO] 
> Able to successfully read from offset 0 for topic and partition 
> [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,2]. 
> Using it to instantiate consumer.
> 2017-07-25 17:11:26.649 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] Starting BrokerProxy for hostname:portnum
> // it's dead, Jim
> ```
> healthy:
> ```
> [...]
> 2017-07-25 17:11:26.920 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] Creating new SimpleConsumer for host hostname:portnum for system kafka
> 2017-07-25 17:11:26.921 [main] org.apache.samza.system.kafka.GetOffset [INFO] 
> Validating offset 0 for topic and partition 
> [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,1]
> 2017-07-25 17:11:27.023 [main] org.apache.samza.system.kafka.GetOffset [INFO] 
> Able to successfully read from offset 0 for topic and partition 
> [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,1]. 
> Using it to instantiate consumer.
> 2017-07-25 17:11:27.023 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] Starting BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.194 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] Shutting down BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.194 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] closing simple consumer...
> 2017-07-25 17:11:29.239 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] 
> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer [INFO] Reconnect due 
> to socket error: java.nio.channels.ClosedChannelException
> 2017-07-25 17:11:29.244 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] 
> org.apache.samza.system.kafka.BrokerProxy [WARN] Restarting consumer due to 
> java.nio.channels.ClosedChannelException. Releasing ownership of all 
> partitions, and restarting consumer. Turn on debugging to get a full stack 
> trace.
> 2017-07-25 17:11:29.247 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] 
> org.apache.samza.system.kafka.KafkaSystemConsumer [INFO] Abdicating for 
> [prod.localStateChangeLog.prod.AlertsOrganizerInstant_alertSetting,1]
> 2017-07-25 17:11:29.247 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] 
> org.apache.samza.system.kafka.KafkaSystemConsumer [INFO] Refreshing brokers 
> for: 
> Map([prod.localStateChangeLog.prod.AlertsOrganizerInstant_alertSetting,1] -> 
> 13572)
> 2017-07-25 17:11:29.247 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] 
> org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to 
> interrupt.
> 2017-07-25 17:11:29.247 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] Shutting down BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.248 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] closing simple consumer...
> 2017-07-25 17:11:29.265 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] 
> org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to 
> interrupt.
> 2017-07-25 17:11:29.265 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] Shutting down BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.265 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] closing simple consumer...
> 2017-07-25 17:11:29.523 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] 
> org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to 
> interrupt.
> 2017-07-25 17:11:29.524 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] Shutting down BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.524 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] closing simple consumer...
> 2017-07-25 17:11:29.601 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] 
> org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to 
> interrupt.
> 2017-07-25 17:11:29.602 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] Shutting down BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.602 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] closing simple consumer...
> 2017-07-25 17:11:29.663 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] 
> org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to 
> interrupt.
> 2017-07-25 17:11:29.668 [main] org.apache.samza.container.SamzaContainer 
> [INFO] Starting host statistics monitor
> 2017-07-25 17:11:29.670 [main] org.apache.samza.container.SamzaContainer 
> [INFO] Registering task instances with producers.
> 2017-07-25 17:11:29.674 [main] org.apache.samza.container.SamzaContainer 
> [INFO] Starting producer multiplexer.
> 2017-07-25 17:11:29.675 [main] org.apache.samza.container.SamzaContainer 
> [INFO] Initializing stream tasks.
> 2017-07-25 17:11:29.676 [main] 
> com.company.samza.app.companyStreamingAppWrapper [INFO] Initializing instance 
> of streaming application
> 2017-07-25 17:11:29.681 [main] 
> com.company.samza.app.companyStreamingAppWrapper [INFO] First initialization. 
> Setting up Guice container with configuration 
> companyStreamingAppWrapperConfiguration{company.app.name=AlertsOrganizerInstant,
>  company.appgroup=aws, company.env=prod, 
> company.guice.module=com.company.notifications.Alerts.organizer..AlertsOrganizerModule}
> 2017-07-25 17:11:30.118 [main] com.company.config.guice.configModule [INFO] 
> configModule loaded requested override file 
> '/storage/data/secure/config/AnalyticsServiceClient.cfg'
> 2017-07-25 17:11:30.480 [main] 
> com.company.samza.dataService.SamzaSessionFactoriesModule [INFO] Loading prod 
> dbConfig from /data/config/prod.database.properties
> // Hibernate stuff (i.e. our code is hit)
> ```



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to