[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14202291#comment-14202291
 ] 

Ewen Cheslack-Postava commented on KAFKA-1745:
----------------------------------------------

[~Vishal M] I looked into this a bit more. I was testing the new producer which 
is why I was seeing different behaviors. With your test program I can reproduce 
the issue.

I tried to track down what was triggering the new KQUEUEs and PIPEs -- I 
thought this would only happen when new sockets were created, but logging the 
creation (and connection requests) for BlockingChannel doesn't indicate we're 
calling those after the initial setup. I tracked down the system calls with 
dtrace, but stacktraces in dtrace are broken on OSX. Instead I had dtrace pause 
the process and then took a stacktrace with jstack -F. This looks like the 
offending thread:

{quote}
Thread 25099: (state = IN_NATIVE)
 - sun.nio.ch.KQueueArrayWrapper.init() @bci=0 (Interpreted frame)
 - sun.nio.ch.KQueueArrayWrapper.<init>() @bci=59, line=100 (Interpreted frame)
 - sun.nio.ch.KQueueSelectorImpl.<init>(java.nio.channels.spi.SelectorProvider) 
@bci=51, line=87 (Interpreted frame)
 - sun.nio.ch.KQueueSelectorProvider.openSelector() @bci=5, line=42 
(Interpreted frame)
 - sun.nio.ch.Util.getTemporarySelector(java.nio.channels.SelectableChannel) 
@bci=54, line=264 (Interpreted frame)
 - sun.nio.ch.SocketAdaptor$SocketInputStream.read(java.nio.ByteBuffer) 
@bci=197, line=214 (Interpreted frame)
 - sun.nio.ch.ChannelInputStream.read(byte[], int, int) @bci=101, line=103 
(Interpreted frame)
 - java.nio.channels.Channels$ReadableByteChannelImpl.read(java.nio.ByteBuffer) 
@bci=84, line=385 (Interpreted frame)
 - kafka.utils.Utils$.read(java.nio.channels.ReadableByteChannel, 
java.nio.ByteBuffer) @bci=2, line=380 (Interpreted frame)
 - 
kafka.network.BoundedByteBufferReceive.readFrom(java.nio.channels.ReadableByteChannel)
 @bci=26, line=54 (Interpreted frame)
 - kafka.network.Receive$class.readCompletely(kafka.network.Receive, 
java.nio.channels.ReadableByteChannel) @bci=15, line=56 (Interpreted frame)
 - 
kafka.network.BoundedByteBufferReceive.readCompletely(java.nio.channels.ReadableByteChannel)
 @bci=2, line=29 (Interpreted frame)
 - kafka.network.BlockingChannel.receive() @bci=20, line=111 (Interpreted frame)
 - kafka.producer.SyncProducer.liftedTree1$1(kafka.api.RequestOrResponse, 
boolean, scala.runtime.ObjectRef) @bci=18, line=76 (Interpreted frame)
 - 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(kafka.api.RequestOrResponse,
 boolean) @bci=33, line=73 (Interpreted frame)
 - 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp()
 @bci=40, line=104 (Interpreted frame)
 - kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply() 
@bci=1, line=104 (Interpreted frame)
 - kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply() 
@bci=1, line=104 (Interpreted frame)
 - kafka.metrics.KafkaTimer.time(scala.Function0) @bci=9, line=33 (Interpreted 
frame)
 - kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp() @bci=12, line=103 
(Interpreted frame)
 - kafka.producer.SyncProducer$$anonfun$send$1.apply() @bci=1, line=103 
(Interpreted frame)
 - kafka.producer.SyncProducer$$anonfun$send$1.apply() @bci=1, line=103 
(Interpreted frame)
 - kafka.metrics.KafkaTimer.time(scala.Function0) @bci=9, line=33 (Interpreted 
frame)
 - kafka.producer.SyncProducer.send(kafka.api.ProducerRequest) @bci=90, 
line=102 (Interpreted frame)
 - 
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(int,
 scala.collection.mutable.Map) @bci=134, line=256 (Interpreted frame)
 - 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(scala.Tuple2)
 @bci=98, line=107 (Interpreted frame)
 - 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(java.lang.Object)
 @bci=5, line=99 (Interpreted frame)
 - 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(java.lang.Object)
 @bci=24, line=772 (Interpreted frame)
 - 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(scala.collection.mutable.DefaultEntry)
 @bci=19, line=98 (Interpreted frame)
 - scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(java.lang.Object) 
@bci=5, line=98 (Interpreted frame)
 - 
scala.collection.mutable.HashTable$class.foreachEntry(scala.collection.mutable.HashTable,
 scala.Function1) @bci=26, line=226 (Interpreted frame)
 - scala.collection.mutable.HashMap.foreachEntry(scala.Function1) @bci=2, 
line=39 (Interpreted frame)
 - scala.collection.mutable.HashMap.foreach(scala.Function1) @bci=10, line=98 
(Interpreted frame)
 - scala.collection.TraversableLike$WithFilter.foreach(scala.Function1) 
@bci=13, line=771 (Interpreted frame)
 - 
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(scala.collection.Seq)
 @bci=65, line=99 (Interpreted frame)
 - kafka.producer.async.DefaultEventHandler.handle(scala.collection.Seq) 
@bci=214, line=72 (Interpreted frame)
 - kafka.producer.Producer.send(scala.collection.Seq) @bci=45, line=76 
(Interpreted frame)
 - kafka.javaapi.producer.Producer.send(kafka.producer.KeyedMessage) @bci=21, 
line=33 (Interpreted frame)
 - kafka.examples.KQTestX$1.run() @bci=83, line=73 (Interpreted frame)
 - java.util.concurrent.Executors$RunnableAdapter.call() @bci=4, line=471 
(Interpreted frame)
 - java.util.concurrent.FutureTask.run() @bci=42, line=262 (Interpreted frame)
 - 
java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
 @bci=95, line=1145 (Interpreted frame)
 - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615 
(Interpreted frame)
 - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame)
{quote}

which shows the NIO is creating the new kqueue internally during a normal read 
(and KQueueSelectorImpl also creates a pipe). I did verify that this doesn't 
happen unless new threads are allocated, but I'm still not sure why using a new 
thread would trigger this or how to fix this since it's internal to NIO.

> Each new thread creates a PIPE and KQUEUE as open files during 
> producer.send() and does not get cleared when the thread that creates them is 
> cleared.
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-1745
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1745
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8.1.1
>         Environment: Mac OS Mavericks
>            Reporter: Vishal
>            Priority: Critical
>
> Hi,
>     I'm using the java client API for Kafka. I wanted to send data to Kafka 
> by using a producer pool as I'm using a sync producer. The thread that sends 
> the data is from the thread pool that grows and shrinks depending on the 
> usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are 
> created (got this info by using lsof). If I keep using the same thread it's 
> fine but when a new thread sends data to Kafka (using producer.send() ) a new 
> KQUEUE and 2 PIPEs are created.
> This is okay, but when the thread is cleared from the thread pool and a new 
> thread is created, then new KQUEUEs and PIPEs are created. The problem is 
> that the old ones which were created are not getting destroyed and they are 
> showing up as open files. This is causing a major problem as the number of 
> open file keep increasing and does not decrease.
> Please suggest any solutions.
> FYI, the number of TCP connections established from the producer system to 
> the Kafka Broker remain constant throughout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to