[
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14179014#comment-14179014
]
Bhavesh Mistry edited comment on KAFKA-1710 at 10/24/14 6:21 PM:
-----------------------------------------------------------------
[~jkreps],
I am sorry I did not get back to you so soon. The cost of enqueue a message
into single partition is ~54% as compare to round-robin. (test with 32
partitions to single topic and 3 broker cluster) The throughput is measuring
the cost of put data into buffer only not cost of sending data to brokers.
Here is test I have done:
To *single* partition:
Throughput per Thread=2666.6666666666665 byte(s)/millisecond
All done...!
To *all* partition:
Throughput per Thread=5818.181818181818 byte(s)/millisecond
All done...!
Here is test program for this:
{code}
package org.kafka.test;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class TestNetworkDownProducer {
static int numberTh = 75;
static CountDownLatch latch = new CountDownLatch(numberTh);
public static void main(String[] args) throws IOException,
InterruptedException {
//Thread.sleep(60000);
Properties prop = new Properties();
InputStream propFile =
Thread.currentThread().getContextClassLoader()
.getResourceAsStream("kafkaproducer.properties");
String topic = "logmon.test";
prop.load(propFile);
System.out.println("Property: " + prop.toString());
StringBuilder builder = new StringBuilder(1024);
int msgLenth = 256;
int numberOfLoop = 5000;
for (int i = 0; i < msgLenth; i++)
builder.append("a");
int numberOfProducer = 1;
Producer[] producer = new Producer[numberOfProducer];
for (int i = 0; i < producer.length; i++) {
producer[i] = new KafkaProducer(prop);
}
ExecutorService service = new ThreadPoolExecutor(numberTh,
numberTh,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(numberTh *2));
MyProducer [] producerThResult = new MyProducer [numberTh];
for(int i = 0 ; i < numberTh;i++){
producerThResult[i] = new
MyProducer(producer,numberOfLoop,builder.toString(), topic);
service.execute(producerThResult[i]);
}
latch.await();
for (int i = 0; i < producer.length; i++) {
producer[i].close();
}
service.shutdownNow();
System.out.println("All Producers done...!");
// now interpret the result... of this...
long lowestTime = 0 ;
for(int i =0 ; i < producerThResult.length;i++){
if(i == 1){
lowestTime =
producerThResult[i].totalTimeinNano;
}else if ( producerThResult[i].totalTimeinNano <
lowestTime){
lowestTime =
producerThResult[i].totalTimeinNano;
}
}
long bytesSend = msgLenth * numberOfLoop;
long durationInMs = TimeUnit.MILLISECONDS.convert(lowestTime,
TimeUnit.NANOSECONDS);
double throughput = (bytesSend * 1.0) / (durationInMs);
System.out.println("Throughput per Thread=" + throughput + "
byte(s)/microsecond");
System.out.println("All done...!");
}
static class MyProducer implements Callable<Long> , Runnable {
Producer[] producer;
long maxloops;
String msg ;
String topic;
long totalTimeinNano = 0;
MyProducer(Producer[] list, long maxloops,String msg,String
topic){
this.producer = list;
this.maxloops = maxloops;
this.msg = msg;
this.topic = topic;
}
public void run() {
// ALWAYS SEND DATA TO PARTITION 1 only...
//ProducerRecord record = new ProducerRecord(topic,
1,null,msg.toString().getBytes());
ProducerRecord record = new ProducerRecord(topic,
null,null,msg.toString().getBytes());
Callback callBack = new MyCallback();
try{
for(long j=0 ; j < maxloops ; j++){
try {
for (int i = 0; i <
producer.length; i++) {
long start =
System.nanoTime();
producer[i].send(record, callBack);
long end =
System.nanoTime();
totalTimeinNano +=
(end-start);
}
Thread.sleep(10);
} catch (Throwable th) {
System.err.println("FATAL ");
th.printStackTrace();
}
}
}finally {
latch.countDown();
}
}
public Long call() throws Exception {
run();
return totalTimeinNano;
}
}
static class MyCallback implements Callback {
public void onCompletion(RecordMetadata metadata, Exception
exception) {
if(exception != null){
System.err.println("Msg dropped..!");
exception.printStackTrace();
}
}
}
}
{code}
was (Author: bmis13):
[~jkreps],
I am sorry I did not get back to you so soon. The cost of enqueue a message
into single partition is ~54% as compare to round-robin. (test with 32
partitions to single topic and 3 broker cluster) The throughput is measuring
the cost of put data into buffer only not cost of sending data to brokers.
Here is test I have done:
To *single* partition:
Throughput per Thread=2666.6666666666665 byte(s)/microsecond
All done...!
To *all* partition:
Throughput per Thread=5818.181818181818 byte(s)/microsecond
All done...!
Here is test program for this:
{code}
package org.kafka.test;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class TestNetworkDownProducer {
static int numberTh = 75;
static CountDownLatch latch = new CountDownLatch(numberTh);
public static void main(String[] args) throws IOException,
InterruptedException {
//Thread.sleep(60000);
Properties prop = new Properties();
InputStream propFile =
Thread.currentThread().getContextClassLoader()
.getResourceAsStream("kafkaproducer.properties");
String topic = "logmon.test";
prop.load(propFile);
System.out.println("Property: " + prop.toString());
StringBuilder builder = new StringBuilder(1024);
int msgLenth = 256;
int numberOfLoop = 5000;
for (int i = 0; i < msgLenth; i++)
builder.append("a");
int numberOfProducer = 1;
Producer[] producer = new Producer[numberOfProducer];
for (int i = 0; i < producer.length; i++) {
producer[i] = new KafkaProducer(prop);
}
ExecutorService service = new ThreadPoolExecutor(numberTh,
numberTh,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(numberTh *2));
MyProducer [] producerThResult = new MyProducer [numberTh];
for(int i = 0 ; i < numberTh;i++){
producerThResult[i] = new
MyProducer(producer,numberOfLoop,builder.toString(), topic);
service.execute(producerThResult[i]);
}
latch.await();
for (int i = 0; i < producer.length; i++) {
producer[i].close();
}
service.shutdownNow();
System.out.println("All Producers done...!");
// now interpret the result... of this...
long lowestTime = 0 ;
for(int i =0 ; i < producerThResult.length;i++){
if(i == 1){
lowestTime =
producerThResult[i].totalTimeinNano;
}else if ( producerThResult[i].totalTimeinNano <
lowestTime){
lowestTime =
producerThResult[i].totalTimeinNano;
}
}
long bytesSend = msgLenth * numberOfLoop;
long durationInMs = TimeUnit.MILLISECONDS.convert(lowestTime,
TimeUnit.NANOSECONDS);
double throughput = (bytesSend * 1.0) / (durationInMs);
System.out.println("Throughput per Thread=" + throughput + "
byte(s)/microsecond");
System.out.println("All done...!");
}
static class MyProducer implements Callable<Long> , Runnable {
Producer[] producer;
long maxloops;
String msg ;
String topic;
long totalTimeinNano = 0;
MyProducer(Producer[] list, long maxloops,String msg,String
topic){
this.producer = list;
this.maxloops = maxloops;
this.msg = msg;
this.topic = topic;
}
public void run() {
// ALWAYS SEND DATA TO PARTITION 1 only...
//ProducerRecord record = new ProducerRecord(topic,
1,null,msg.toString().getBytes());
ProducerRecord record = new ProducerRecord(topic,
null,null,msg.toString().getBytes());
Callback callBack = new MyCallback();
try{
for(long j=0 ; j < maxloops ; j++){
try {
for (int i = 0; i <
producer.length; i++) {
long start =
System.nanoTime();
producer[i].send(record, callBack);
long end =
System.nanoTime();
totalTimeinNano +=
(end-start);
}
Thread.sleep(10);
} catch (Throwable th) {
System.err.println("FATAL ");
th.printStackTrace();
}
}
}finally {
latch.countDown();
}
}
public Long call() throws Exception {
run();
return totalTimeinNano;
}
}
static class MyCallback implements Callback {
public void onCompletion(RecordMetadata metadata, Exception
exception) {
if(exception != null){
System.err.println("Msg dropped..!");
exception.printStackTrace();
}
}
}
}
{code}
> [New Java Producer Potential Deadlock] Producer Deadlock when all messages is
> being sent to single partition
> ------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-1710
> URL: https://issues.apache.org/jira/browse/KAFKA-1710
> Project: Kafka
> Issue Type: Bug
> Components: producer
> Environment: Development
> Reporter: Bhavesh Mistry
> Assignee: Ewen Cheslack-Postava
> Priority: Critical
> Labels: performance
> Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot
> 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png,
> TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump,
> th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump,
> th6.dump, th7.dump, th8.dump, th9.dump
>
>
> Hi Kafka Dev Team,
> When I run the test to send message to single partition for 3 minutes or so
> on, I have encounter deadlock (please see the screen attached) and thread
> contention from YourKit profiling.
> Use Case:
> 1) Aggregating messages into same partition for metric counting.
> 2) Replicate Old Producer behavior for sticking to partition for 3 minutes.
> Here is output:
> Frozen threads found (potential deadlock)
>
> It seems that the following threads have not changed their stack for more
> than 10 seconds.
> These threads are possibly (but not necessarily!) in a deadlock or hung.
>
> pool-1-thread-128 <--- Frozen for at least 2m
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
> byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord,
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run()
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker)
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run()
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> pool-1-thread-159 <--- Frozen for at least 2m 1 sec
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
> byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord,
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run()
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker)
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run()
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> pool-1-thread-55 <--- Frozen for at least 2m
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
> byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord,
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run()
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker)
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run()
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> Thanks,
> Bhavesh
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)