[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14169415#comment-14169415 ]
Bhavesh Mistry commented on KAFKA-1642: --------------------------------------- {code} import java.io.IOException; import java.io.InputStream; import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TestNetworkDownProducer { public static void main(String[] args) throws IOException { Properties prop = new Properties(); InputStream propFile = Thread.currentThread().getContextClassLoader() .getResourceAsStream("kafkaproducer.properties"); String topic = "test"; prop.load(propFile); System.out.println("Property: " + prop.toString()); StringBuilder builder = new StringBuilder(1024); int msgLenth = 256; for (int i = 0; i < msgLenth; i++) builder.append("a"); int numberOfProducer = 4; Producer[] producer = new Producer[numberOfProducer]; for (int i = 0; i < producer.length; i++) { producer[i] = new KafkaProducer(prop); } Callback callback = new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ System.err.println("Msg dropped..!"); exception.printStackTrace(); } } }; ProducerRecord record = new ProducerRecord(topic, builder.toString().getBytes()); while (true) { try { for (int i = 0; i < producer.length; i++) { producer[i].send(record, callback); } Thread.sleep(10); } catch (Throwable th) { System.err.println("FATAL "); th.printStackTrace(); } } } } {code} {code: name=kafkaproducer.properties } # THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at https://kafka.apache.org/documentation.html#newproducerconfigs # Broker List bootstrap.servers= BROKERS HERE... #Data Acks acks=1 # 64MB of Buffer for log lines (including all messages). buffer.memory=134217728 compression.type=snappy retries=3 # DEFAULT FROM THE KAFKA... # batch size = ((buffer.memory) / (number of partitions)) (so we can have in progress batch size created for each partition.). batch.size=1048576 #2MiB max.request.size=1048576 send.buffer.bytes=2097152 # We do not want to block the buffer Full so application thread will not be blocked but logs lines will be dropped... block.on.buffer.full=false #2MiB send.buffer.bytes=2097152 #wait... linger.ms=5000 {code} > [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network > connection is lost > --------------------------------------------------------------------------------------- > > Key: KAFKA-1642 > URL: https://issues.apache.org/jira/browse/KAFKA-1642 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 0.8.2 > Reporter: Bhavesh Mistry > Assignee: Jun Rao > > I see my CPU spike to 100% when network connection is lost for while. It > seems network IO thread are very busy logging following error message. Is > this expected behavior ? > 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR > org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka > producer I/O thread: > java.lang.IllegalStateException: No entry found for node -2 > at > org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) > at > org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) > at > org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) > at > org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) > at java.lang.Thread.run(Thread.java:744) > Thanks, > Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)