[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14179014#comment-14179014
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/21/14 8:26 PM:
-----------------------------------------------------------------

[~jkreps],

I am sorry I did not get back to you soon.  The cost of enqueue a message into 
single partition is ~54% as compare to round-robin.  (test with 32 partition to 
single topic and 3 cluster)  The throughput is measuring the cost  of put data 
into buffer.

Here is test I have done:

To *single* partition:
Throughput per Thread=2666.6666666666665  byte(s)/microsecond
All done...!

To *all* partition:
Throughput per Thread=5818.181818181818  byte(s)/microsecond
All done...!

{code}
package org.kafka.test;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TestNetworkDownProducer {
        
        static int numberTh = 75;
        static CountDownLatch latch = new CountDownLatch(numberTh);
        public static void main(String[] args) throws IOException, 
InterruptedException {
                
                //Thread.sleep(60000);

                Properties prop = new Properties();
                InputStream propFile = 
Thread.currentThread().getContextClassLoader()
                                
.getResourceAsStream("kafkaproducer.properties");

                String topic = "logmon.test";
                prop.load(propFile);
                System.out.println("Property: " + prop.toString());
                StringBuilder builder = new StringBuilder(1024);
                int msgLenth = 256;
                int numberOfLoop = 5000;
                for (int i = 0; i < msgLenth; i++)
                        builder.append("a");

                int numberOfProducer = 1;
                Producer[] producer = new Producer[numberOfProducer];

                for (int i = 0; i < producer.length; i++) {
                        producer[i] = new KafkaProducer(prop);
                }
                ExecutorService service =   new ThreadPoolExecutor(numberTh, 
numberTh,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(numberTh *2));
                MyProducer [] producerThResult = new MyProducer [numberTh];
                for(int i = 0 ; i < numberTh;i++){
                        producerThResult[i] = new 
MyProducer(producer,numberOfLoop,builder.toString(), topic);
                        service.execute(producerThResult[i]);
                }               
                latch.await();
                for (int i = 0; i < producer.length; i++) {
                        producer[i].close();
                }               
                service.shutdownNow();
                System.out.println("All Producers done...!");
                // now interpret the result... of this...
                long lowestTime = 0 ;
                for(int i =0 ; i < producerThResult.length;i++){
                        if(i == 1){
                                lowestTime = 
producerThResult[i].totalTimeinNano;
                        }else if ( producerThResult[i].totalTimeinNano < 
lowestTime){
                                lowestTime = 
producerThResult[i].totalTimeinNano;
                        }
                }
                long bytesSend = msgLenth * numberOfLoop;
                long durationInMs = TimeUnit.MILLISECONDS.convert(lowestTime, 
TimeUnit.NANOSECONDS);

                double throughput = (bytesSend * 1.0) / (durationInMs);
                System.out.println("Throughput per Thread=" + throughput + "  
byte(s)/microsecond");
                
                System.out.println("All done...!");

        }


        
        static class MyProducer implements Callable<Long> , Runnable {
                
                Producer[] producer;
                long maxloops;
                String msg ;
                String topic;
                long totalTimeinNano = 0;
                
                MyProducer(Producer[] list, long maxloops,String msg,String 
topic){
                        this.producer = list;
                        this.maxloops = maxloops;
                        this.msg = msg;
                        this.topic = topic;
                }
                public void run() {
                        // ALWAYS SEND DATA TO PARTITION 1 only...              
                        //ProducerRecord record = new ProducerRecord(topic, 
1,null,msg.toString().getBytes());
                        ProducerRecord record = new ProducerRecord(topic, 
null,null,msg.toString().getBytes());

                        Callback  callBack = new  MyCallback();
                        try{
                                for(long j=0 ; j < maxloops ; j++){
                                        try {
                                                for (int i = 0; i < 
producer.length; i++) {
                                                        long start = 
System.nanoTime();
                                                        
producer[i].send(record, callBack);
                                                        long end = 
System.nanoTime();
                                                        totalTimeinNano += 
(end-start);
                                                }
                                                Thread.sleep(10);
                                        } catch (Throwable th) {
                                                System.err.println("FATAL ");
                                                th.printStackTrace();
                                        }
                                }

                        }finally {
                                latch.countDown();
                        }                       
                }
                public Long call() throws Exception {
                        run();
                        return totalTimeinNano;
                }
        }       

        static class MyCallback implements Callback {
                public void onCompletion(RecordMetadata metadata, Exception 
exception) {
                        if(exception != null){
                                System.err.println("Msg dropped..!");
                                exception.printStackTrace();
                        }
                        
                }
        }
        
}
{code}


was (Author: bmis13):
[~jkreps],

I am sorry I did not get back to you soon.  The cost of enqueue a message into 
single partition only is ~54%. 

Here is test I have done:

To *single* partition:
Throughput per Thread=2666.6666666666665  byte(s)/microsecond
All done...!

To *all* partition:
Throughput per Thread=5818.181818181818  byte(s)/microsecond
All done...!


The cost of sync block in roughly around  

{code}
package org.kafka.test;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TestNetworkDownProducer {
        
        static int numberTh = 75;
        static CountDownLatch latch = new CountDownLatch(numberTh);
        public static void main(String[] args) throws IOException, 
InterruptedException {
                
                //Thread.sleep(60000);

                Properties prop = new Properties();
                InputStream propFile = 
Thread.currentThread().getContextClassLoader()
                                
.getResourceAsStream("kafkaproducer.properties");

                String topic = "logmon.test";
                prop.load(propFile);
                System.out.println("Property: " + prop.toString());
                StringBuilder builder = new StringBuilder(1024);
                int msgLenth = 256;
                int numberOfLoop = 5000;
                for (int i = 0; i < msgLenth; i++)
                        builder.append("a");

                int numberOfProducer = 1;
                Producer[] producer = new Producer[numberOfProducer];

                for (int i = 0; i < producer.length; i++) {
                        producer[i] = new KafkaProducer(prop);
                }
                ExecutorService service =   new ThreadPoolExecutor(numberTh, 
numberTh,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(numberTh *2));
                MyProducer [] producerThResult = new MyProducer [numberTh];
                for(int i = 0 ; i < numberTh;i++){
                        producerThResult[i] = new 
MyProducer(producer,numberOfLoop,builder.toString(), topic);
                        service.execute(producerThResult[i]);
                }               
                latch.await();
                for (int i = 0; i < producer.length; i++) {
                        producer[i].close();
                }               
                service.shutdownNow();
                System.out.println("All Producers done...!");
                // now interpret the result... of this...
                long lowestTime = 0 ;
                for(int i =0 ; i < producerThResult.length;i++){
                        if(i == 1){
                                lowestTime = 
producerThResult[i].totalTimeinNano;
                        }else if ( producerThResult[i].totalTimeinNano < 
lowestTime){
                                lowestTime = 
producerThResult[i].totalTimeinNano;
                        }
                }
                long bytesSend = msgLenth * numberOfLoop;
                long durationInMs = TimeUnit.MILLISECONDS.convert(lowestTime, 
TimeUnit.NANOSECONDS);

                double throughput = (bytesSend * 1.0) / (durationInMs);
                System.out.println("Throughput per Thread=" + throughput + "  
byte(s)/microsecond");
                
                System.out.println("All done...!");

        }


        
        static class MyProducer implements Callable<Long> , Runnable {
                
                Producer[] producer;
                long maxloops;
                String msg ;
                String topic;
                long totalTimeinNano = 0;
                
                MyProducer(Producer[] list, long maxloops,String msg,String 
topic){
                        this.producer = list;
                        this.maxloops = maxloops;
                        this.msg = msg;
                        this.topic = topic;
                }
                public void run() {
                        // ALWAYS SEND DATA TO PARTITION 1 only...              
                        //ProducerRecord record = new ProducerRecord(topic, 
1,null,msg.toString().getBytes());
                        ProducerRecord record = new ProducerRecord(topic, 
null,null,msg.toString().getBytes());

                        Callback  callBack = new  MyCallback();
                        try{
                                for(long j=0 ; j < maxloops ; j++){
                                        try {
                                                for (int i = 0; i < 
producer.length; i++) {
                                                        long start = 
System.nanoTime();
                                                        
producer[i].send(record, callBack);
                                                        long end = 
System.nanoTime();
                                                        totalTimeinNano += 
(end-start);
                                                }
                                                Thread.sleep(10);
                                        } catch (Throwable th) {
                                                System.err.println("FATAL ");
                                                th.printStackTrace();
                                        }
                                }

                        }finally {
                                latch.countDown();
                        }                       
                }
                public Long call() throws Exception {
                        run();
                        return totalTimeinNano;
                }
        }       

        static class MyCallback implements Callback {
                public void onCompletion(RecordMetadata metadata, Exception 
exception) {
                        if(exception != null){
                                System.err.println("Msg dropped..!");
                                exception.printStackTrace();
                        }
                        
                }
        }
        
}
{code}

> [New Java Producer Potential Deadlock] Producer Deadlock when all messages is 
> being sent to single partition
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-1710
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1710
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>         Environment: Development
>            Reporter: Bhavesh Mistry
>            Assignee: Ewen Cheslack-Postava
>            Priority: Critical
>              Labels: performance
>         Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 
> 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, 
> TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, 
> th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, 
> th6.dump, th7.dump, th8.dump, th9.dump
>
>
> Hi Kafka Dev Team,
> When I run the test to send message to single partition for 3 minutes or so 
> on, I have encounter deadlock (please see the screen attached) and thread 
> contention from YourKit profiling.  
> Use Case:
> 1)  Aggregating messages into same partition for metric counting. 
> 2)  Replicate Old Producer behavior for sticking to partition for 3 minutes.
> Here is output:
> Frozen threads found (potential deadlock)
>  
> It seems that the following threads have not changed their stack for more 
> than 10 seconds.
> These threads are possibly (but not necessarily!) in a deadlock or hung.
>  
> pool-1-thread-128 <--- Frozen for at least 2m
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> pool-1-thread-159 <--- Frozen for at least 2m 1 sec
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> pool-1-thread-55 <--- Frozen for at least 2m
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> Thanks,
> Bhavesh 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to