[ 
https://issues.apache.org/jira/browse/KAFKA-6709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zou Tao updated KAFKA-6709:
---------------------------
    Description: 
I have updated to release 1.0.1.

I set up cluster which have four brokers.
 you could find the server.properties in the attachment.
 There are about 150 topics, and about total 4000 partitions, ReplicationFactor 
is 2.
 connctors are used to write/read data to/from brokers.
 connecotr version is 0.10.1.
 The average message size is 500B, and around 60000 messages per seconds.
 one of the broker keep report OOM, and can't handle request like:

[2018-03-24 12:37:17,449] ERROR [KafkaApi-1001] Error when handling request 
{replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=voltetraffica.data,partitions=[

{partition=16,fetch_offset=51198,max_bytes=60728640}

,\{partition=12,fetch_offset=50984,max_bytes=60728640}]}]} 
(kafka.server.KafkaApis)
 java.lang.OutOfMemoryError: Java heap space
         at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
         at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
         at 
org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:101)
         at 
org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253)
         at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:525)
         at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:523)
         at scala.Option.map(Option.scala:146)
         at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:523)
         at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:513)
         at scala.Option.flatMap(Option.scala:171)
         at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:513)
         at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:561)
         at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:560)
         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
         at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
         at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:560)
         at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:574)
         at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:574)
         at 
kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2041)
         at 
kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:54)
         at 
kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2040)
         at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:574)
         at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:593)
         at 
kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:176)
         at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:592)
         at 
kafka.server.KafkaApis$$anonfun$handleFetchRequest$4.apply(KafkaApis.scala:609)
         at 
kafka.server.KafkaApis$$anonfun$handleFetchRequest$4.apply(KafkaApis.scala:609)
         at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820)
         at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:601)
         at kafka.server.KafkaApis.handle(KafkaApis.scala:99)

and then lots of shrink ISR ( this broker is 1001).

[2018-03-24 13:43:00,285] INFO [Partition gnup.source.offset.storage.topic-5 
broker=1001] Shrinking ISR from 1001,1002 to 1001 (kafka.cluster.Partition)
 [2018-03-24 13:43:00,286] INFO [Partition s1mme.data-72 broker=1001] Shrinking 
ISR from 1001,1002 to 1001 (kafka.cluster.Partition)
 [2018-03-24 13:43:00,286] INFO [Partition gnup.sink.status.storage.topic-17 
broker=1001] Shrinking ISR from 1001,1002 to 1001 (kafka.cluster.Partition)
 [2018-03-24 13:43:00,287] INFO [Partition 
probessgsniups.sink.offset.storage.topic-4 broker=1001] Shrinking ISR from 
1001,1002 to 1001 (kafka.cluster.Partition)
 [2018-03-24 13:43:01,447] INFO [GroupCoordinator 1001]: Stabilized group 
connect-VOICE_1_SINK_CONN generation 26 (__consumer_offsets-18) 
(kafka.coordinator.group.GroupCoordinator)

i can't dump the heap since each time I run:
 [root@sslave1 kafka]# jcmd 55409 GC.heap_dump /home/ngdb/heap_dump175.hprof
 55409:
 com.sun.tools.attach.AttachNotSupportedException: Unable to open socket file: 
target process not responding or HotSpot VM not loaded
         at 
sun.tools.attach.LinuxVirtualMachine.<init>(LinuxVirtualMachine.java:106)
         at 
sun.tools.attach.LinuxAttachProvider.attachVirtualMachine(LinuxAttachProvider.java:63)
         at com.sun.tools.attach.VirtualMachine.attach(VirtualMachine.java:208)
         at sun.tools.jcmd.JCmd.executeCommandForPid(JCmd.java:147)
         at sun.tools.jcmd.JCmd.main(JCmd.java:131)
         
 the JVM parameter is:
 -XX:+ExplicitGCInvokesConcurrent -XX:GCLogFileSize=104857600 
-XX:InitialHeapSize=2147483648 -XX:InitiatingHeapOccupancyPercent=35 
-XX:+ManagementServer -XX:MaxGCPauseMillis=20 -XX:MaxHeapSize=4294967296 
-XX:NumberOfGCLogFiles=10 -XX:+PrintGC -XX:+PrintGCDateStamps 
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseCompressedClassPointers 
-XX:+UseCompressedOops -XX:+UseG1GC -XX:+UseGCLogFileRotation    

when I use -XX:mx=2G, four brokers reported OOM,

after i increated it to 4G, only one brokers reported OOM.

 

  was:
I have updated to release 1.0.1.

I set up cluster which have four brokers.
you could find the server.properties in the attachment.
There are about 150 topics, and about total 4000 partitions, ReplicationFactor 
is 2.
connctors are used to write/read data to/from brokers.
connecotr version is 0.10.1.
The average message size is 500B, and around 60000 messages per seconds.
one of the broker keep report OOM, and can't handle request like:

[2018-03-24 12:37:17,449] ERROR [KafkaApi-1001] Error when handling request 
\{replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=voltetraffica.data,partitions=[{partition=16,fetch_offset=51198,max_bytes=60728640},\{partition=12,fetch_offset=50984,max_bytes=60728640}]}]}
 (kafka.server.KafkaApis)
java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at 
org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:101)
        at 
org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253)
        at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:525)
        at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:523)
        at scala.Option.map(Option.scala:146)
        at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:523)
        at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:513)
        at scala.Option.flatMap(Option.scala:171)
        at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:513)
        at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:561)
        at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:560)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:560)
        at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:574)
        at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:574)
        at 
kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2041)
        at 
kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:54)
        at 
kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2040)
        at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:574)
        at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:593)
        at 
kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:176)
        at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:592)
        at 
kafka.server.KafkaApis$$anonfun$handleFetchRequest$4.apply(KafkaApis.scala:609)
        at 
kafka.server.KafkaApis$$anonfun$handleFetchRequest$4.apply(KafkaApis.scala:609)
        at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820)
        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:601)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:99)

and then lots of shrink ISR ( this broker is 1001).

[2018-03-24 13:43:00,285] INFO [Partition gnup.source.offset.storage.topic-5 
broker=1001] Shrinking ISR from 1001,1002 to 1001 (kafka.cluster.Partition)
[2018-03-24 13:43:00,286] INFO [Partition s1mme.data-72 broker=1001] Shrinking 
ISR from 1001,1002 to 1001 (kafka.cluster.Partition)
[2018-03-24 13:43:00,286] INFO [Partition gnup.sink.status.storage.topic-17 
broker=1001] Shrinking ISR from 1001,1002 to 1001 (kafka.cluster.Partition)
[2018-03-24 13:43:00,287] INFO [Partition 
probessgsniups.sink.offset.storage.topic-4 broker=1001] Shrinking ISR from 
1001,1002 to 1001 (kafka.cluster.Partition)
[2018-03-24 13:43:01,447] INFO [GroupCoordinator 1001]: Stabilized group 
connect-VOICE_1_SINK_CONN generation 26 (__consumer_offsets-18) 
(kafka.coordinator.group.GroupCoordinator)

i can't dump the heap since each time I run:
[root@sslave1 kafka]# jcmd 55409 GC.heap_dump /home/ngdb/heap_dump175.hprof
55409:
com.sun.tools.attach.AttachNotSupportedException: Unable to open socket file: 
target process not responding or HotSpot VM not loaded
        at 
sun.tools.attach.LinuxVirtualMachine.<init>(LinuxVirtualMachine.java:106)
        at 
sun.tools.attach.LinuxAttachProvider.attachVirtualMachine(LinuxAttachProvider.java:63)
        at com.sun.tools.attach.VirtualMachine.attach(VirtualMachine.java:208)
        at sun.tools.jcmd.JCmd.executeCommandForPid(JCmd.java:147)
        at sun.tools.jcmd.JCmd.main(JCmd.java:131)
        
the JVM parameter is:
-XX:+ExplicitGCInvokesConcurrent -XX:GCLogFileSize=104857600 
-XX:InitialHeapSize=2147483648 -XX:InitiatingHeapOccupancyPercent=35 
-XX:+ManagementServer -XX:MaxGCPauseMillis=20 -XX:MaxHeapSize=4294967296 
-XX:NumberOfGCLogFiles=10 -XX:+PrintGC -XX:+PrintGCDateStamps 
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseCompressedClassPointers 
-XX:+UseCompressedOops -XX:+UseG1GC -XX:+UseGCLogFileRotation    

when I use -XX:mx=2G, four brokers reported OOM,

after i increated it to 4G, only one brokers reported OOM.


    


> broker failed to handle request due to OOM
> ------------------------------------------
>
>                 Key: KAFKA-6709
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6709
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.0.1
>            Reporter: Zou Tao
>            Priority: Critical
>         Attachments: kafkaServer-gc.log.0.current.zip, kafkaServer.out.tgz, 
> server.properties
>
>
> I have updated to release 1.0.1.
> I set up cluster which have four brokers.
>  you could find the server.properties in the attachment.
>  There are about 150 topics, and about total 4000 partitions, 
> ReplicationFactor is 2.
>  connctors are used to write/read data to/from brokers.
>  connecotr version is 0.10.1.
>  The average message size is 500B, and around 60000 messages per seconds.
>  one of the broker keep report OOM, and can't handle request like:
> [2018-03-24 12:37:17,449] ERROR [KafkaApi-1001] Error when handling request 
> {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=voltetraffica.data,partitions=[
> {partition=16,fetch_offset=51198,max_bytes=60728640}
> ,\{partition=12,fetch_offset=50984,max_bytes=60728640}]}]} 
> (kafka.server.KafkaApis)
>  java.lang.OutOfMemoryError: Java heap space
>          at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>          at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>          at 
> org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:101)
>          at 
> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253)
>          at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:525)
>          at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:523)
>          at scala.Option.map(Option.scala:146)
>          at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:523)
>          at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:513)
>          at scala.Option.flatMap(Option.scala:171)
>          at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:513)
>          at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:561)
>          at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:560)
>          at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>          at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>          at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>          at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>          at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:560)
>          at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:574)
>          at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:574)
>          at 
> kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2041)
>          at 
> kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:54)
>          at 
> kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2040)
>          at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:574)
>          at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:593)
>          at 
> kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:176)
>          at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:592)
>          at 
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$4.apply(KafkaApis.scala:609)
>          at 
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$4.apply(KafkaApis.scala:609)
>          at 
> kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820)
>          at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:601)
>          at kafka.server.KafkaApis.handle(KafkaApis.scala:99)
> and then lots of shrink ISR ( this broker is 1001).
> [2018-03-24 13:43:00,285] INFO [Partition gnup.source.offset.storage.topic-5 
> broker=1001] Shrinking ISR from 1001,1002 to 1001 (kafka.cluster.Partition)
>  [2018-03-24 13:43:00,286] INFO [Partition s1mme.data-72 broker=1001] 
> Shrinking ISR from 1001,1002 to 1001 (kafka.cluster.Partition)
>  [2018-03-24 13:43:00,286] INFO [Partition gnup.sink.status.storage.topic-17 
> broker=1001] Shrinking ISR from 1001,1002 to 1001 (kafka.cluster.Partition)
>  [2018-03-24 13:43:00,287] INFO [Partition 
> probessgsniups.sink.offset.storage.topic-4 broker=1001] Shrinking ISR from 
> 1001,1002 to 1001 (kafka.cluster.Partition)
>  [2018-03-24 13:43:01,447] INFO [GroupCoordinator 1001]: Stabilized group 
> connect-VOICE_1_SINK_CONN generation 26 (__consumer_offsets-18) 
> (kafka.coordinator.group.GroupCoordinator)
> i can't dump the heap since each time I run:
>  [root@sslave1 kafka]# jcmd 55409 GC.heap_dump /home/ngdb/heap_dump175.hprof
>  55409:
>  com.sun.tools.attach.AttachNotSupportedException: Unable to open socket 
> file: target process not responding or HotSpot VM not loaded
>          at 
> sun.tools.attach.LinuxVirtualMachine.<init>(LinuxVirtualMachine.java:106)
>          at 
> sun.tools.attach.LinuxAttachProvider.attachVirtualMachine(LinuxAttachProvider.java:63)
>          at 
> com.sun.tools.attach.VirtualMachine.attach(VirtualMachine.java:208)
>          at sun.tools.jcmd.JCmd.executeCommandForPid(JCmd.java:147)
>          at sun.tools.jcmd.JCmd.main(JCmd.java:131)
>          
>  the JVM parameter is:
>  -XX:+ExplicitGCInvokesConcurrent -XX:GCLogFileSize=104857600 
> -XX:InitialHeapSize=2147483648 -XX:InitiatingHeapOccupancyPercent=35 
> -XX:+ManagementServer -XX:MaxGCPauseMillis=20 -XX:MaxHeapSize=4294967296 
> -XX:NumberOfGCLogFiles=10 -XX:+PrintGC -XX:+PrintGCDateStamps 
> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseCompressedClassPointers 
> -XX:+UseCompressedOops -XX:+UseG1GC -XX:+UseGCLogFileRotation    
> when I use -XX:mx=2G, four brokers reported OOM,
> after i increated it to 4G, only one brokers reported OOM.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to