[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174278#comment-14174278
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:33 PM:
-----------------------------------------------------------------

[~ewencp],

Thanks for looking into this.  If you look at the thread dump, you will see the 
blocked threads as well.  As this particular code exposes the Thread 
contentions in the Kafka Producer.  We have this issues when we aggregate event 
to send to same partition regardless of number of producers.  It would be great 
if you into alternative implementation to synchronization block.  That is root 
of the problem.
synchronized (dq) {
      
}

Do you think it would be better to do this following way ?
{code title=KafkaAsyncProducer.java|borderStyle=solid }
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;

public class KafkaAsyncProducer implements Producer {
        
        // TODO configure this queue
        private final LinkedBlockingQueue<ProducerRecord> asyncQueue; 
        private final KafkaProducer producer;
        private final List<Thread> threadList;
        private final CountDownLatch latch;

        private final AtomicBoolean close = new AtomicBoolean(false);
        
        public KafkaAsyncProducer(int capacity, int numberOfDrainTreads,
                        Properties configFile ){
                if(configFile == null){
                        throw new NullPointerException("Producer configuration 
cannot be null");
                }
                // set the capacity for the queue
                asyncQueue = new LinkedBlockingQueue<ProducerRecord>(capacity);
                producer = new KafkaProducer(configFile);
                threadList = new ArrayList<Thread>(numberOfDrainTreads);
                latch = new CountDownLatch(numberOfDrainTreads);
                // start the drain threads...
                for(int i =0 ; i < numberOfDrainTreads ; i ++){
                        Thread th = new Thread(new 
ConsumerThread(),"Kafka_Drain-" +i);
                        th.setDaemon(true);
                        threadList.add(th);
                        th.start();
                }
                
        }
        


        public Future<RecordMetadata> send(ProducerRecord record) {
                try {
                        if(record == null){
                                throw new NullPointerException("Null record 
cannot be sent.");
                        }
                        if(close.get()){
                                throw new KafkaException("Producer aready 
closed or in processec of closing...");
                        }
                        asyncQueue.put(record);
                } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }       
                return null;
        }

        public Future<RecordMetadata> send(ProducerRecord record, Callback 
callback) {
                throw new UnsupportedOperationException("Send not supported");
        }

        public List<PartitionInfo> partitionsFor(String topic) {
                // TODO Auto-generated method stub
                return null;
        }

        public Map<String, ? extends Metric> metrics() {
                
                return producer.metrics();
        }

        public void close() {
                close.compareAndSet(false, true);
                // wait for drain threads to finish
                try {
                        latch.await();
                        // now drain the remaining messages....
                        while(!asyncQueue.isEmpty()){
                                ProducerRecord record  = asyncQueue.poll();
                                producer.send(record);
                        }
                } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }
                producer.close();
        }

        private class ConsumerThread implements Runnable{
                public void run() {
                        try{
                        while(!close.get()){
                                ProducerRecord record;
                                try {
                                        record = asyncQueue.take();
                                        if(record != null ){
                                                //System.out.print(".");
                                                producer.send(record);
                                        }                                       
                                } catch (InterruptedException e) {
                                        // TODO Auto-generated catch block
                                        e.printStackTrace();
                                }

                        }
                        }finally{
                                latch.countDown();
                        }
                }
        }
        
}
{code}

We truly need Async for Aggregation use case.  Please note above code is still 
work in progress.

Thanks,

Bhavesh 


was (Author: bmis13):
[~ewencp],

Thanks for looking into this.  If you look at the thread dump, you will see the 
blocked threads as well.  As this particular code exposes the Thread 
contentions in the Kafka Producer.  We have this issues when we aggregate event 
to send to same partition regardless of number of producers.  It would be great 
if you into alternative implementation to synchronization block.  That is root 
of the problem.

{code title=RecordAccumulator.java|borderStyle=solid}
 synchronized (dq) {
      
}
{code}

Do you think it would be better to do this following way ?
{code title=KafkaAsyncProducer.java|borderStyle=solid }
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;

public class KafkaAsyncProducer implements Producer {
        
        // TODO configure this queue
        private final LinkedBlockingQueue<ProducerRecord> asyncQueue; 
        private final KafkaProducer producer;
        private final List<Thread> threadList;
        private final CountDownLatch latch;

        private final AtomicBoolean close = new AtomicBoolean(false);
        
        public KafkaAsyncProducer(int capacity, int numberOfDrainTreads,
                        Properties configFile ){
                if(configFile == null){
                        throw new NullPointerException("Producer configuration 
cannot be null");
                }
                // set the capacity for the queue
                asyncQueue = new LinkedBlockingQueue<ProducerRecord>(capacity);
                producer = new KafkaProducer(configFile);
                threadList = new ArrayList<Thread>(numberOfDrainTreads);
                latch = new CountDownLatch(numberOfDrainTreads);
                // start the drain threads...
                for(int i =0 ; i < numberOfDrainTreads ; i ++){
                        Thread th = new Thread(new 
ConsumerThread(),"Kafka_Drain-" +i);
                        th.setDaemon(true);
                        threadList.add(th);
                        th.start();
                }
                
        }
        


        public Future<RecordMetadata> send(ProducerRecord record) {
                try {
                        if(record == null){
                                throw new NullPointerException("Null record 
cannot be sent.");
                        }
                        if(close.get()){
                                throw new KafkaException("Producer aready 
closed or in processec of closing...");
                        }
                        asyncQueue.put(record);
                } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }       
                return null;
        }

        public Future<RecordMetadata> send(ProducerRecord record, Callback 
callback) {
                throw new UnsupportedOperationException("Send not supported");
        }

        public List<PartitionInfo> partitionsFor(String topic) {
                // TODO Auto-generated method stub
                return null;
        }

        public Map<String, ? extends Metric> metrics() {
                
                return producer.metrics();
        }

        public void close() {
                close.compareAndSet(false, true);
                // wait for drain threads to finish
                try {
                        latch.await();
                        // now drain the remaining messages....
                        while(!asyncQueue.isEmpty()){
                                ProducerRecord record  = asyncQueue.poll();
                                producer.send(record);
                        }
                } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }
                producer.close();
        }

        private class ConsumerThread implements Runnable{
                public void run() {
                        try{
                        while(!close.get()){
                                ProducerRecord record;
                                try {
                                        record = asyncQueue.take();
                                        if(record != null ){
                                                //System.out.print(".");
                                                producer.send(record);
                                        }                                       
                                } catch (InterruptedException e) {
                                        // TODO Auto-generated catch block
                                        e.printStackTrace();
                                }

                        }
                        }finally{
                                latch.countDown();
                        }
                }
        }
        
}
{code}

We truly need Async for Aggregation use case.  Please note above code is still 
work in progress.

Thanks,

Bhavesh 

> [New Java Producer Potential Deadlock] Producer Deadlock when all messages is 
> being sent to single partition
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-1710
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1710
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>         Environment: Development
>            Reporter: Bhavesh Mistry
>            Priority: Critical
>              Labels: performance
>         Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 
> 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, 
> TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, 
> th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, 
> th6.dump, th7.dump, th8.dump, th9.dump
>
>
> Hi Kafka Dev Team,
> When I run the test to send message to single partition for 3 minutes or so 
> on, I have encounter deadlock (please see the screen attached) and thread 
> contention from YourKit profiling.  
> Use Case:
> 1)  Aggregating messages into same partition for metric counting. 
> 2)  Replicate Old Producer behavior for sticking to partition for 3 minutes.
> Here is output:
> Frozen threads found (potential deadlock)
>  
> It seems that the following threads have not changed their stack for more 
> than 10 seconds.
> These threads are possibly (but not necessarily!) in a deadlock or hung.
>  
> pool-1-thread-128 <--- Frozen for at least 2m
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> pool-1-thread-159 <--- Frozen for at least 2m 1 sec
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> pool-1-thread-55 <--- Frozen for at least 2m
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> Thanks,
> Bhavesh 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to