[
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14169415#comment-14169415
]
Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 4:09 PM:
-----------------------------------------------------------------
{code}
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class TestNetworkDownProducer {
static int numberTh = 200;
static CountDownLatch latch = new CountDownLatch(200);
public static void main(String[] args) throws IOException,
InterruptedException {
Properties prop = new Properties();
InputStream propFile =
Thread.currentThread().getContextClassLoader()
.getResourceAsStream("kafkaproducer.properties");
String topic = "test";
prop.load(propFile);
System.out.println("Property: " + prop.toString());
StringBuilder builder = new StringBuilder(1024);
int msgLenth = 256;
for (int i = 0; i < msgLenth; i++)
builder.append("a");
int numberOfProducer = 4;
Producer[] producer = new Producer[numberOfProducer];
for (int i = 0; i < producer.length; i++) {
producer[i] = new KafkaProducer(prop);
}
ExecutorService service = new ThreadPoolExecutor(numberTh,
numberTh,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(numberTh *2));
for(int i = 0 ; i < numberTh;i++){
service.execute(new
MyProducer(producer,10,builder.toString(), topic));
}
latch.await();
System.out.println("All Producers done...!");
for (int i = 0; i < producer.length; i++) {
producer[i].close();
}
service.shutdownNow();
System.out.println("All done...!");
}
static class MyProducer implements Runnable {
Producer[] producer;
long maxloops;
String msg ;
String topic;
MyProducer(Producer[] list, long maxloops,String msg,String
topic){
this.producer = list;
this.maxloops = maxloops;
this.msg = msg;
this.topic = topic;
}
public void run() {
ProducerRecord record = new ProducerRecord(topic,
msg.toString().getBytes());
Callback callBack = new MyCallback();
try{
for(long j=0 ; j < maxloops ; j++){
try {
for (int i = 0; i <
producer.length; i++) {
producer[i].send(record, callBack);
}
Thread.sleep(10);
} catch (Throwable th) {
System.err.println("FATAL ");
th.printStackTrace();
}
}
}finally {
latch.countDown();
}
}
}
static class MyCallback implements Callback {
public void onCompletion(RecordMetadata metadata, Exception
exception) {
if(exception != null){
System.err.println("Msg dropped..!");
exception.printStackTrace();
}
}
}
}
{code}
{code }
# THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at
https://kafka.apache.org/documentation.html#newproducerconfigs
# Broker List
bootstrap.servers= BROKERS HERE...
#Data Acks
acks=1
# 64MB of Buffer for log lines (including all messages).
buffer.memory=134217728
compression.type=snappy
retries=3
# DEFAULT FROM THE KAFKA...
# batch size = ((buffer.memory) / (number of partitions)) (so we can have in
progress batch size created for each partition.).
batch.size=1048576
#2MiB
max.request.size=1048576
send.buffer.bytes=2097152
# We do not want to block the buffer Full so application thread will not be
blocked but logs lines will be dropped...
block.on.buffer.full=false
#2MiB
send.buffer.bytes=2097152
#wait...
linger.ms=5000
{code}
was (Author: bmis13):
{code}
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class TestNetworkDownProducer {
static int numberTh = 200;
static CountDownLatch latch = new CountDownLatch(200);
public static void main(String[] args) throws IOException,
InterruptedException {
Properties prop = new Properties();
InputStream propFile =
Thread.currentThread().getContextClassLoader()
.getResourceAsStream("kafkaproducer.properties");
String topic = "test";
prop.load(propFile);
System.out.println("Property: " + prop.toString());
StringBuilder builder = new StringBuilder(1024);
int msgLenth = 256;
for (int i = 0; i < msgLenth; i++)
builder.append("a");
int numberOfProducer = 4;
Producer[] producer = new Producer[numberOfProducer];
for (int i = 0; i < producer.length; i++) {
producer[i] = new KafkaProducer(prop);
}
ExecutorService service = new ThreadPoolExecutor(numberTh,
numberTh,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(numberTh *2));
for(int i = 0 ; i < numberTh;i++){
service.execute(new
MyProducer(producer,10,builder.toString(), topic));
}
latch.await();
System.out.println("All Producers done...!");
for (int i = 0; i < producer.length; i++) {
producer[i].close();
}
service.shutdownNow();
System.out.println("All done...!");
}
static class MyProducer implements Runnable {
Producer[] producer;
long maxloops;
String msg ;
String topic;
MyProducer(Producer[] list, long maxloops,String msg,String
topic){
this.producer = list;
this.maxloops = maxloops;
this.msg = msg;
this.topic = topic;
}
public void run() {
ProducerRecord record = new ProducerRecord(topic,
msg.toString().getBytes());
Callback callBack = new MyCallback();
try{
for(long j=0 ; j < maxloops ; j++){
try {
for (int i = 0; i <
producer.length; i++) {
producer[i].send(record, callBack);
}
Thread.sleep(10);
} catch (Throwable th) {
System.err.println("FATAL ");
th.printStackTrace();
}
}
}finally {
latch.countDown();
}
}
}
static class MyCallback implements Callback {
public void onCompletion(RecordMetadata metadata, Exception
exception) {
if(exception != null){
System.err.println("Msg dropped..!");
exception.printStackTrace();
}
}
}
}
{code}
Property File....
{code }
# THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at
https://kafka.apache.org/documentation.html#newproducerconfigs
# Broker List
bootstrap.servers= BROKERS HERE...
#Data Acks
acks=1
# 64MB of Buffer for log lines (including all messages).
buffer.memory=134217728
compression.type=snappy
retries=3
# DEFAULT FROM THE KAFKA...
# batch size = ((buffer.memory) / (number of partitions)) (so we can have in
progress batch size created for each partition.).
batch.size=1048576
#2MiB
max.request.size=1048576
send.buffer.bytes=2097152
# We do not want to block the buffer Full so application thread will not be
blocked but logs lines will be dropped...
block.on.buffer.full=false
#2MiB
send.buffer.bytes=2097152
#wait...
linger.ms=5000
{code}
> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network
> connection is lost
> ---------------------------------------------------------------------------------------
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
> Issue Type: Bug
> Components: producer
> Affects Versions: 0.8.2
> Reporter: Bhavesh Mistry
> Assignee: Jun Rao
>
> I see my CPU spike to 100% when network connection is lost for while. It
> seems network IO thread are very busy logging following error message. Is
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka
> producer I/O thread:
> java.lang.IllegalStateException: No entry found for node -2
> at
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)