[
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14202291#comment-14202291
]
Ewen Cheslack-Postava commented on KAFKA-1745:
----------------------------------------------
[~Vishal M] I looked into this a bit more. I was testing the new producer which
is why I was seeing different behaviors. With your test program I can reproduce
the issue.
I tried to track down what was triggering the new KQUEUEs and PIPEs -- I
thought this would only happen when new sockets were created, but logging the
creation (and connection requests) for BlockingChannel doesn't indicate we're
calling those after the initial setup. I tracked down the system calls with
dtrace, but stacktraces in dtrace are broken on OSX. Instead I had dtrace pause
the process and then took a stacktrace with jstack -F. This looks like the
offending thread:
{quote}
Thread 25099: (state = IN_NATIVE)
- sun.nio.ch.KQueueArrayWrapper.init() @bci=0 (Interpreted frame)
- sun.nio.ch.KQueueArrayWrapper.<init>() @bci=59, line=100 (Interpreted frame)
- sun.nio.ch.KQueueSelectorImpl.<init>(java.nio.channels.spi.SelectorProvider)
@bci=51, line=87 (Interpreted frame)
- sun.nio.ch.KQueueSelectorProvider.openSelector() @bci=5, line=42
(Interpreted frame)
- sun.nio.ch.Util.getTemporarySelector(java.nio.channels.SelectableChannel)
@bci=54, line=264 (Interpreted frame)
- sun.nio.ch.SocketAdaptor$SocketInputStream.read(java.nio.ByteBuffer)
@bci=197, line=214 (Interpreted frame)
- sun.nio.ch.ChannelInputStream.read(byte[], int, int) @bci=101, line=103
(Interpreted frame)
- java.nio.channels.Channels$ReadableByteChannelImpl.read(java.nio.ByteBuffer)
@bci=84, line=385 (Interpreted frame)
- kafka.utils.Utils$.read(java.nio.channels.ReadableByteChannel,
java.nio.ByteBuffer) @bci=2, line=380 (Interpreted frame)
-
kafka.network.BoundedByteBufferReceive.readFrom(java.nio.channels.ReadableByteChannel)
@bci=26, line=54 (Interpreted frame)
- kafka.network.Receive$class.readCompletely(kafka.network.Receive,
java.nio.channels.ReadableByteChannel) @bci=15, line=56 (Interpreted frame)
-
kafka.network.BoundedByteBufferReceive.readCompletely(java.nio.channels.ReadableByteChannel)
@bci=2, line=29 (Interpreted frame)
- kafka.network.BlockingChannel.receive() @bci=20, line=111 (Interpreted frame)
- kafka.producer.SyncProducer.liftedTree1$1(kafka.api.RequestOrResponse,
boolean, scala.runtime.ObjectRef) @bci=18, line=76 (Interpreted frame)
-
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(kafka.api.RequestOrResponse,
boolean) @bci=33, line=73 (Interpreted frame)
-
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp()
@bci=40, line=104 (Interpreted frame)
- kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply()
@bci=1, line=104 (Interpreted frame)
- kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply()
@bci=1, line=104 (Interpreted frame)
- kafka.metrics.KafkaTimer.time(scala.Function0) @bci=9, line=33 (Interpreted
frame)
- kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp() @bci=12, line=103
(Interpreted frame)
- kafka.producer.SyncProducer$$anonfun$send$1.apply() @bci=1, line=103
(Interpreted frame)
- kafka.producer.SyncProducer$$anonfun$send$1.apply() @bci=1, line=103
(Interpreted frame)
- kafka.metrics.KafkaTimer.time(scala.Function0) @bci=9, line=33 (Interpreted
frame)
- kafka.producer.SyncProducer.send(kafka.api.ProducerRequest) @bci=90,
line=102 (Interpreted frame)
-
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(int,
scala.collection.mutable.Map) @bci=134, line=256 (Interpreted frame)
-
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(scala.Tuple2)
@bci=98, line=107 (Interpreted frame)
-
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(java.lang.Object)
@bci=5, line=99 (Interpreted frame)
-
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(java.lang.Object)
@bci=24, line=772 (Interpreted frame)
-
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(scala.collection.mutable.DefaultEntry)
@bci=19, line=98 (Interpreted frame)
- scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(java.lang.Object)
@bci=5, line=98 (Interpreted frame)
-
scala.collection.mutable.HashTable$class.foreachEntry(scala.collection.mutable.HashTable,
scala.Function1) @bci=26, line=226 (Interpreted frame)
- scala.collection.mutable.HashMap.foreachEntry(scala.Function1) @bci=2,
line=39 (Interpreted frame)
- scala.collection.mutable.HashMap.foreach(scala.Function1) @bci=10, line=98
(Interpreted frame)
- scala.collection.TraversableLike$WithFilter.foreach(scala.Function1)
@bci=13, line=771 (Interpreted frame)
-
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(scala.collection.Seq)
@bci=65, line=99 (Interpreted frame)
- kafka.producer.async.DefaultEventHandler.handle(scala.collection.Seq)
@bci=214, line=72 (Interpreted frame)
- kafka.producer.Producer.send(scala.collection.Seq) @bci=45, line=76
(Interpreted frame)
- kafka.javaapi.producer.Producer.send(kafka.producer.KeyedMessage) @bci=21,
line=33 (Interpreted frame)
- kafka.examples.KQTestX$1.run() @bci=83, line=73 (Interpreted frame)
- java.util.concurrent.Executors$RunnableAdapter.call() @bci=4, line=471
(Interpreted frame)
- java.util.concurrent.FutureTask.run() @bci=42, line=262 (Interpreted frame)
-
java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
@bci=95, line=1145 (Interpreted frame)
- java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
(Interpreted frame)
- java.lang.Thread.run() @bci=11, line=745 (Interpreted frame)
{quote}
which shows the NIO is creating the new kqueue internally during a normal read
(and KQueueSelectorImpl also creates a pipe). I did verify that this doesn't
happen unless new threads are allocated, but I'm still not sure why using a new
thread would trigger this or how to fix this since it's internal to NIO.
> Each new thread creates a PIPE and KQUEUE as open files during
> producer.send() and does not get cleared when the thread that creates them is
> cleared.
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-1745
> URL: https://issues.apache.org/jira/browse/KAFKA-1745
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.8.1.1
> Environment: Mac OS Mavericks
> Reporter: Vishal
> Priority: Critical
>
> Hi,
> I'm using the java client API for Kafka. I wanted to send data to Kafka
> by using a producer pool as I'm using a sync producer. The thread that sends
> the data is from the thread pool that grows and shrinks depending on the
> usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are
> created (got this info by using lsof). If I keep using the same thread it's
> fine but when a new thread sends data to Kafka (using producer.send() ) a new
> KQUEUE and 2 PIPEs are created.
> This is okay, but when the thread is cleared from the thread pool and a new
> thread is created, then new KQUEUEs and PIPEs are created. The problem is
> that the old ones which were created are not getting destroyed and they are
> showing up as open files. This is causing a major problem as the number of
> open file keep increasing and does not decrease.
> Please suggest any solutions.
> FYI, the number of TCP connections established from the producer system to
> the Kafka Broker remain constant throughout.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)