Steven Schlansker created KAFKA-4787:
----------------------------------------

             Summary: KafkaStreams close() is not reentrant
                 Key: KAFKA-4787
                 URL: https://issues.apache.org/jira/browse/KAFKA-4787
             Project: Kafka
          Issue Type: Improvement
          Components: streams
    Affects Versions: 0.10.2.0
            Reporter: Steven Schlansker


While building a simple application, I tried to implement a failure policy 
where any uncaught exception terminates the application until an administrator 
can evaluate and intervene:

{code}
    /** Handle any uncaught exception by shutting down the program. */
    private void handleStreamException(Thread thread, Throwable t) {
        LOG.error("stream exception in thread {}", thread, t);
        streams.close();
    }

    streams.setUncaughtExceptionHandler(this::handleStreamException);
    streams.start();
{code}

Unfortunately, because the KafkaStreams#close() method takes a lock, this is 
prone to what looks like a deadlock:

{code}
"StreamThread-1" #80 prio=5 os_prio=0 tid=0x00007f56096f4000 nid=0x40c8 waiting 
for monitor entry [0x00007f54f03ee000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java)
        - waiting to lock <0x00000000f171cda8> (a 
org.apache.kafka.streams.KafkaStreams)
        at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438)
        at 
com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown 
Source)
        at com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212)
        at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207)
        at 
com.opentable.chat.service.ChatStorage.handleStreamException(ChatStorage.java:541)
        at 
com.opentable.chat.service.ChatStorage$$Lambda$123/149062221.uncaughtException(Unknown
 Source)
        at java.lang.Thread.dispatchUncaughtException(Thread.java:1956)

"main" #1 prio=5 os_prio=0 tid=0x00007f5608011000 nid=0x3f76 in Object.wait() 
[0x00007f5610f04000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at java.lang.Thread.join(Thread.java:1249)
        - locked <0x00000000fd302bf0> (a java.lang.Thread)
        at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:494)
        - locked <0x00000000f171cda8> (a org.apache.kafka.streams.KafkaStreams)
        at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438)
        at 
com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown 
Source)
        at com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212)
        at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207)
{code}

Note how the main thread calls close(), which encounters an exception.  It uses 
a StreamThread to dispatch to the handler, which calls close().  Once it tries 
to take the monitor, we are left in a position where main is joined on 
StreamThread-1, but StreamThread-1 is waiting for main to release that monitor.

Arguably it's a bit abusive to call close() in this way (it certainly wasn't 
intentional) -- but to make Kafka Streams robust it should handle any sequence 
of close() invocations in particular gracefully.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to