[ 
https://issues.apache.org/jira/browse/KAFKA-4489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15724657#comment-15724657
 ] 

Niranjan Nanda edited comment on KAFKA-4489 at 12/6/16 7:15 AM:
----------------------------------------------------------------

Ismael, I did understand the issue; the point is why OOM? It's a case where 
"server running on both PLAINTEXT and SSL port, and, consumer is trying to 
connect to SSL port using PLAINTEXT protocol". Something similar to using 443 
port for a HTTP (instead of HTTPS) connection. So, it should result in either 
connection timeout or some sort of connection failure message instead of OOM. 
This is very confusing!

Per your explanation, it also looks like the socket listener does not 
understand the difference between SSL handshake packets and message packets. I 
am not sure if this is the right implementation.


was (Author: nnanda):
Ismael, I did understand the issue; the point is why OOM? It's a case where 
"server running on both PLAINTEXT and SSL port, and, consumer is trying to 
connect to SSL port using PLAINTEXT protocol". Something similar to using 443 
port for a HTTP (instead of HTTPS) connection. So, it should result in either 
connection timeout or some sort of connection failure message instead of OOM. 
This is very confusing!

> Kafka Consumer throws Java Heap Out of Space Error on failed SSL connections
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-4489
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4489
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.10.0.0
>            Reporter: Niranjan Nanda
>
> Configured Kafka brokers on SSL. At consumer side, I configured following 
> properties:
> kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "host1:9093,host2:9093,host3:9093");
> kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group1");
> kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> kafkaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "host_name_of_machine");
> On starting, I could following exceptions in the kafka log
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-4] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initialize connection to node -3 
> for sending metadata request
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-4] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initiating connection to node -3 at 
> <host1>:9093.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Node -2 disconnected.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] WARN 
> org.apache.kafka.clients.NetworkClient# - Bootstrap broker <host2>:9093 
> disconnected
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Node -2 disconnected.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Sending metadata request 
> {topics=[my_topic]} to node -1
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] WARN 
> org.apache.kafka.clients.NetworkClient# - Bootstrap broker <host2>:9093 
> disconnected
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initialize connection to node -1 
> for sending metadata request
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initiating connection to node -1 at 
> <host3>:9093.
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-12] DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator# - Cannot 
> auto-commit offsets for group my_group1 since the coordinator is unknown
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-14] DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator# - Cannot 
> auto-commit offsets for group my_group1 since the coordinator is unknown
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-12] DEBUG 
> org.apache.kafka.common.network.Selector# - Connection with <host1>/<IP> 
> disconnected
> java.io.EOFException
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99)
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>     at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
>     at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
>     at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
>     at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:183)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:973)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
>     at com.demo.consumer.Consumer.run(Consumer.java:71)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> And, following stack traces are present in my app log
> java.lang.OutOfMemoryError: Java heap space
>       at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>       at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>       at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
>       at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>       at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
>       at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
>       at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
>       at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:973)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
>       at com.demo.consumer.Consumer.run(Consumer.java:71)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 2016-12-05 22:36:58.955 [Thread: pool-2-thread-15] DEBUG 
> com.demo.consumer.Consumer# - Uncaught exception while consuming the 
> message...
> java.lang.OutOfMemoryError: Java heap space
>       at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>       at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>       at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
>       at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>       at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
>       at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
>       at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
>       at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:973)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
>       at com.demo.consumer.Consumer.run(Consumer.java:71)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 2016-12-05 22:37:03.817 [Thread: pool-2-thread-8] DEBUG 
> com.demo.consumer.Consumer# - Uncaught exception while consuming the 
> message...
> java.lang.OutOfMemoryError: Direct buffer memory
>       at java.nio.Bits.reserveMemory(Bits.java:693)
>       at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>       at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>       at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
>       at sun.nio.ch.IOUtil.read(IOUtil.java:195)
>       at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>       at 
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
>       at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
>       at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>       at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
>       at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
>       at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
>       at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:973)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
>       at com.demo.consumer.Consumer.run(Consumer.java:71)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> The message consumption was not working.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to