This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7b06a24  MINOR: Restore interrupt status when closing (#9863)
7b06a24 is described below

commit 7b06a2417df0d098b8947327f77629f16d2f5e49
Author: Bruno Cadonna <[email protected]>
AuthorDate: Fri Jan 22 21:15:52 2021 +0100

    MINOR: Restore interrupt status when closing (#9863)
    
    We do not always own the thread that executes the close()  method, i.e., we 
do not know the interruption policy of the thread. Thus, we should not swallow 
the interruption. The least we can do is restoring the interruption status 
before the current thread exits this method.
    
    Reviewers: Walker Carlson <[email protected]>, Anna Sophie Blee-Goldman 
<[email protected]>
---
 .../org/apache/kafka/streams/KafkaStreams.java     | 33 ++++++++++++++--------
 .../streams/processor/internals/StreamThread.java  | 21 ++++++++++----
 2 files changed, 38 insertions(+), 16 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index ad9cdbe..aaa6e2f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -250,20 +250,31 @@ public class KafkaStreams implements AutoCloseable {
     private boolean waitOnState(final State targetState, final long waitMs) {
         final long begin = time.milliseconds();
         synchronized (stateLock) {
+            boolean interrupted = false;
             long elapsedMs = 0L;
-            while (state != targetState) {
-                if (waitMs > elapsedMs) {
-                    final long remainingMs = waitMs - elapsedMs;
-                    try {
-                        stateLock.wait(remainingMs);
-                    } catch (final InterruptedException e) {
-                        // it is ok: just move on to the next iteration
+            try {
+                while (state != targetState) {
+                    if (waitMs > elapsedMs) {
+                        final long remainingMs = waitMs - elapsedMs;
+                        try {
+                            stateLock.wait(remainingMs);
+                        } catch (final InterruptedException e) {
+                            interrupted = true;
+                        }
+                    } else {
+                        log.debug("Cannot transit to {} within {}ms", 
targetState, waitMs);
+                        return false;
                     }
-                } else {
-                    log.debug("Cannot transit to {} within {}ms", targetState, 
waitMs);
-                    return false;
+                    elapsedMs = time.milliseconds() - begin;
+                }
+            } finally {
+                // Make sure to restore the interruption status before 
returning.
+                // We do not always own the current thread that executes this 
method, i.e., we do not know the
+                // interruption policy of the thread. The least we can do is 
restore the interruption status before
+                // the current thread exits this method.
+                if (interrupted) {
+                    Thread.currentThread().interrupt();
                 }
-                elapsedMs = time.milliseconds() - begin;
             }
             return true;
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 0d15647..559f8fc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -612,11 +612,22 @@ public class StreamThread extends Thread {
 
     public void waitOnThreadState(final StreamThread.State targetState) {
         synchronized (stateLock) {
-            while (state != targetState) {
-                try {
-                    stateLock.wait();
-                } catch (final InterruptedException e) {
-                    // it is ok: just move on to the next iteration
+            boolean interrupted = false;
+            try {
+                while (state != targetState) {
+                    try {
+                        stateLock.wait();
+                    } catch (final InterruptedException e) {
+                        interrupted = true;
+                    }
+                }
+            } finally {
+                // Make sure to restore the interruption status before 
returning.
+                // We do not always own the current thread that executes this 
method, i.e., we do not know the
+                // interruption policy of the thread. The least we can do is 
restore the interruption status before
+                // the current thread exits this method.
+                if (interrupted) {
+                    Thread.currentThread().interrupt();
                 }
             }
         }

Reply via email to