Muralidhar Basani created KAFKA-20403:
-----------------------------------------

             Summary: Streams: InterruptedException swallowed without restoring 
thread interrupt flag in multiple classes
                 Key: KAFKA-20403
                 URL: https://issues.apache.org/jira/browse/KAFKA-20403
             Project: Kafka
          Issue Type: Improvement
          Components: streams
            Reporter: Muralidhar Basani
            Assignee: Muralidhar Basani


In several places in the streams module code where we catch 
InterruptedException but don't call Thread.currentThread().interrupt() to 
restore the interrupt flag. This goes against Java's interrupt contract — when 
you catch InterruptedException, you should either re-throw it or restore the 
flag so callers up the stack know the thread was interrupted.         
                                                        
We already do this correctly in other places like TaskManager.java:719, 
InternalTopicManager.java:558, and GlobalStreamThread.java:461. Probably this 
was missed, however the impact is low in not fixing it. It is more of code 
correctness and consistency with the rest of the codebase. As there are state 
flags and condition signals in shutdown calls which take care of these 
interruptions, it is of low prio.                                               
           
                                                        
But KafkaStreamsNamedTopologyWrapper has two ex.printStackTrace() calls (lines 
308 and 333) that should be replaced with proper logging. That class is 
deprecated and planned for removal, but we could clean it up.
                                                                                
                                                           
Affected files:

- 
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 (lines 880, 958)
- 
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
 (lines 120, 269)
- 
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
 (line 149)       
- 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
 (line 232)
- 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
 (lines 308, 333)
- 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/AddNamedTopologyResult.java
 (line 56) 
- 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 (line 1788)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to