This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7b06a24 MINOR: Restore interrupt status when closing (#9863)
7b06a24 is described below
commit 7b06a2417df0d098b8947327f77629f16d2f5e49
Author: Bruno Cadonna <[email protected]>
AuthorDate: Fri Jan 22 21:15:52 2021 +0100
MINOR: Restore interrupt status when closing (#9863)
We do not always own the thread that executes the close() method, i.e., we
do not know the interruption policy of the thread. Thus, we should not swallow
the interruption. The least we can do is restoring the interruption status
before the current thread exits this method.
Reviewers: Walker Carlson <[email protected]>, Anna Sophie Blee-Goldman
<[email protected]>
---
.../org/apache/kafka/streams/KafkaStreams.java | 33 ++++++++++++++--------
.../streams/processor/internals/StreamThread.java | 21 ++++++++++----
2 files changed, 38 insertions(+), 16 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index ad9cdbe..aaa6e2f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -250,20 +250,31 @@ public class KafkaStreams implements AutoCloseable {
private boolean waitOnState(final State targetState, final long waitMs) {
final long begin = time.milliseconds();
synchronized (stateLock) {
+ boolean interrupted = false;
long elapsedMs = 0L;
- while (state != targetState) {
- if (waitMs > elapsedMs) {
- final long remainingMs = waitMs - elapsedMs;
- try {
- stateLock.wait(remainingMs);
- } catch (final InterruptedException e) {
- // it is ok: just move on to the next iteration
+ try {
+ while (state != targetState) {
+ if (waitMs > elapsedMs) {
+ final long remainingMs = waitMs - elapsedMs;
+ try {
+ stateLock.wait(remainingMs);
+ } catch (final InterruptedException e) {
+ interrupted = true;
+ }
+ } else {
+ log.debug("Cannot transit to {} within {}ms",
targetState, waitMs);
+ return false;
}
- } else {
- log.debug("Cannot transit to {} within {}ms", targetState,
waitMs);
- return false;
+ elapsedMs = time.milliseconds() - begin;
+ }
+ } finally {
+ // Make sure to restore the interruption status before
returning.
+ // We do not always own the current thread that executes this
method, i.e., we do not know the
+ // interruption policy of the thread. The least we can do is
restore the interruption status before
+ // the current thread exits this method.
+ if (interrupted) {
+ Thread.currentThread().interrupt();
}
- elapsedMs = time.milliseconds() - begin;
}
return true;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 0d15647..559f8fc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -612,11 +612,22 @@ public class StreamThread extends Thread {
public void waitOnThreadState(final StreamThread.State targetState) {
synchronized (stateLock) {
- while (state != targetState) {
- try {
- stateLock.wait();
- } catch (final InterruptedException e) {
- // it is ok: just move on to the next iteration
+ boolean interrupted = false;
+ try {
+ while (state != targetState) {
+ try {
+ stateLock.wait();
+ } catch (final InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ } finally {
+ // Make sure to restore the interruption status before
returning.
+ // We do not always own the current thread that executes this
method, i.e., we do not know the
+ // interruption policy of the thread. The least we can do is
restore the interruption status before
+ // the current thread exits this method.
+ if (interrupted) {
+ Thread.currentThread().interrupt();
}
}
}