[ 
https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14259235#comment-14259235
 ] 

Bhavesh Mistry commented on KAFKA-1788:
---------------------------------------

The use case I have is little different but similar:

Here is my use case:
1) Lets suppose we have  3 brokers (b1,b2, b3)   and a topic with 30 partitions 
and replication 1.  So partition 1 to 10 is on b1 (is leader), partition 11 to 
20 on b2 and 21 to 30 is on b3.  Zk has all leadership info and every thing is 
fine.
2) From the Client every is working fine, but only b1 broker is not reachable 
(due to network or firewall issue) and note that leader is still reported as b1.
3)  The patch you have provided will not address the above issue where you 
detect that leader is not available and then you purge batch.  So case is 
little different, leader is available but not able to connect or firewall rule 
in in-place. 

Based on above use case,  I see following two problems which I have reported on 
 Please refer to KAFKA-1642 for more details. 

1) record-error-rate metric remain zero despite following firewall rule.  In my 
opinion, it should have called  org.apache.kafka.clients.producer.Callback  but 
I did not see that happening either  one or two are brokers not reachable. 

{code}

sudo ipfw add reject tcp from me to b1.ip dst-port 9092
sudo ipfw add reject tcp from me to b2.ip dst-port 9092

00100 reject tcp from me to b1.ip dst-port 9092
00200 reject tcp from me to b2.ip dst-port 9092
{code}

{code}
        class LoggingCallBackHandler implements Callback {

                /**
                 * A callback method the user can implement to provide 
asynchronous
                 * handling of request completion. This method will be called 
when the
                 * record sent to the server has been acknowledged. Exactly one 
of the
                 * arguments will be non-null.
                 * 
                 * @param metadata
                 *            The metadata for the record that was sent (i.e. 
the
                 *            partition and offset). Null if an error occurred.
                 * @param exception
                 *            The exception thrown during processing of this 
record.
                 *            Null if no error occurred.
                 */
                @Override
                public void onCompletion(RecordMetadata metadata, Exception 
exception) {
                        if(exception != null){
                                exception.printStackTrace();
                        }
                }
        }
{code}

I do not see any exception at all on console....not sure why ?

2)      Application does NOT gracefully shutdown when there one or more brokers 
are not reachable.

{code}
"SIGTERM handler" daemon prio=5 tid=0x00007f8bd79e4000 nid=0x17907 waiting for 
monitor entry [0x000000011e906000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at java.lang.Shutdown.exit(Shutdown.java:212)
        - waiting to lock <0x000000070008f7c0> (a java.lang.Class for 
java.lang.Shutdown)
        at java.lang.Terminator$1.handle(Terminator.java:52)
        at sun.misc.Signal$1.run(Signal.java:212)
        at java.lang.Thread.run(Thread.java:744)

"SIGTERM handler" daemon prio=5 tid=0x00007f8bd5159000 nid=0x1cb0b waiting for 
monitor entry [0x000000011e803000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at java.lang.Shutdown.exit(Shutdown.java:212)
        - waiting to lock <0x000000070008f7c0> (a java.lang.Class for 
java.lang.Shutdown)
        at java.lang.Terminator$1.handle(Terminator.java:52)
        at sun.misc.Signal$1.run(Signal.java:212)
        at java.lang.Thread.run(Thread.java:744)

"SIGTERM handler" daemon prio=5 tid=0x00007f8bdd147800 nid=0x15d0b waiting for 
monitor entry [0x000000011e30a000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at java.lang.Shutdown.exit(Shutdown.java:212)
        - waiting to lock <0x000000070008f7c0> (a java.lang.Class for 
java.lang.Shutdown)
        at java.lang.Terminator$1.handle(Terminator.java:52)
        at sun.misc.Signal$1.run(Signal.java:212)
        at java.lang.Thread.run(Thread.java:744)

"SIGTERM handler" daemon prio=5 tid=0x00007f8bdf820000 nid=0x770b waiting for 
monitor entry [0x000000011e207000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at java.lang.Shutdown.exit(Shutdown.java:212)
        - waiting to lock <0x000000070008f7c0> (a java.lang.Class for 
java.lang.Shutdown)
        at java.lang.Terminator$1.handle(Terminator.java:52)
        at sun.misc.Signal$1.run(Signal.java:212)
        at java.lang.Thread.run(Thread.java:744)

"SIGTERM handler" daemon prio=5 tid=0x00007f8bdc393800 nid=0x1c30f waiting for 
monitor entry [0x000000011e104000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at java.lang.Shutdown.exit(Shutdown.java:212)
        - waiting to lock <0x000000070008f7c0> (a java.lang.Class for 
java.lang.Shutdown)
        at java.lang.Terminator$1.handle(Terminator.java:52)
        at sun.misc.Signal$1.run(Signal.java:212)
        at java.lang.Thread.run(Thread.java:744)

"Thread-4" prio=5 tid=0x00007f8bdb39f000 nid=0xa107 in Object.wait() 
[0x000000011ea89000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.$$YJP$$wait(Native Method)
        at java.lang.Object.wait(Object.java)
        at java.lang.Thread.join(Thread.java:1280)
        - locked <0x0000000705c2f650> (a 
org.apache.kafka.common.utils.KafkaThread)
        at java.lang.Thread.join(Thread.java:1354)
        at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:322)
        at 

"kafka-producer-network-thread | error" daemon prio=5 tid=0x00007f8bd814e000 
nid=0x7403 runnable [0x000000011e6c0000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.KQueueArrayWrapper.$$YJP$$kevent0(Native Method)
        at sun.nio.ch.KQueueArrayWrapper.kevent0(KQueueArrayWrapper.java)
        at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:200)
        at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:103)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
        - locked <0x0000000705c109f8> (a sun.nio.ch.Util$2)
        - locked <0x0000000705c109e8> (a java.util.Collections$UnmodifiableSet)
        - locked <0x0000000705c105c8> (a sun.nio.ch.KQueueSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
        at org.apache.kafka.common.network.Selector.select(Selector.java:322)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:212)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
        at java.lang.Thread.run(Thread.java:744)
{code}


> producer record can stay in RecordAccumulator forever if leader is no 
> available
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-1788
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1788
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, producer 
>    Affects Versions: 0.8.2
>            Reporter: Jun Rao
>            Assignee: Jun Rao
>              Labels: newbie++
>             Fix For: 0.8.3
>
>         Attachments: KAFKA-1788.patch
>
>
> In the new producer, when a partition has no leader for a long time (e.g., 
> all replicas are down), the records for that partition will stay in the 
> RecordAccumulator until the leader is available. This may cause the 
> bufferpool to be full and the callback for the produced message to block for 
> a long time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to