[
https://issues.apache.org/jira/browse/KAFKA-3552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Liquan Pei reassigned KAFKA-3552:
---------------------------------
Assignee: Liquan Pei (was: Neha Narkhede)
> New Consumer: java.lang.OutOfMemoryError: Direct buffer memory
> --------------------------------------------------------------
>
> Key: KAFKA-3552
> URL: https://issues.apache.org/jira/browse/KAFKA-3552
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 0.9.0.1
> Reporter: Kanak Biscuitwala
> Assignee: Liquan Pei
>
> I'm running Kafka's new consumer with message handlers that can sometimes
> take a lot of time to return, and combining that with manual offset
> management (to get at-least-once semantics). Since poll() is the only way to
> heartbeat with the consumer, I have a thread that runs every 500 milliseconds
> that does the following:
> 1) Pause all partitions
> 2) Call poll(0)
> 3) Resume all partitions
> For the record, all accesses to KafkaConsumer are protected by synchronized
> blocks. This generally works, but I'm occasionally seeing messages like this:
> {code}
> java.lang.OutOfMemoryError: Direct buffer memory
> at java.nio.Bits.reserveMemory(Bits.java:658)
> at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> at sun.nio.ch.IOUtil.read(IOUtil.java:195)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> at
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:108)
> at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
> at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
> at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
> at
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> {code}
> In addition, when I'm reporting offsets, I'm seeing:
> {code}
> java.lang.OutOfMemoryError: Direct buffer memory
> at java.nio.Bits.reserveMemory(Bits.java:658)
> at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> at sun.nio.ch.IOUtil.read(IOUtil.java:195)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> at
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:108)
> at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
> at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
> at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
> at
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968)
> {code}
> Given that I'm just calling the library, this behavior is unexpected.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)