This is an automated email from the ASF dual-hosted git repository.
csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new d410fc253 IMPALA-14788: Prevent Thread interrupt after completion
d410fc253 is described below
commit d410fc253db1a6773bf7682d26803f0894c5d717
Author: Michael Smith <[email protected]>
AuthorDate: Thu Feb 26 09:09:02 2026 -0800
IMPALA-14788: Prevent Thread interrupt after completion
When cancelling a thread, uses computeIfPresent to prevent interrupting
the thread after the thread has finished all its work, which would leave
the thread in an interrupted state if it's re-used later. Ensures the
Thread's interrupted status is cleared after cancellable work is done.
Moves logging after that work to avoid logging while holding a lock.
Change-Id: I54af34d253511f18f59b5cd43f1cf57a26a772eb
Reviewed-on: http://gerrit.cloudera.org:8080/24044
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../java/org/apache/impala/service/Canceller.java | 24 ++++++++++++++--------
1 file changed, 15 insertions(+), 9 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/service/Canceller.java
b/fe/src/main/java/org/apache/impala/service/Canceller.java
index a51d17796..216a31d6a 100644
--- a/fe/src/main/java/org/apache/impala/service/Canceller.java
+++ b/fe/src/main/java/org/apache/impala/service/Canceller.java
@@ -66,6 +66,8 @@ public final class Canceller {
Preconditions.checkState(curr.first == Thread.currentThread());
Preconditions.checkState(curr.second == cancelled_.get());
cancelled_.remove();
+ // Clear interrupt status of the thread to prevent it from affecting
future work.
+ Thread.interrupted();
}
}
@@ -85,18 +87,22 @@ public final class Canceller {
*/
public static void cancel(TUniqueId queryId) {
if (queryId == null) return;
- Pair<Thread, AtomicBoolean> queryPair = queryThreads_.get(queryId);
- if (queryPair == null) {
+ Pair<Thread, AtomicBoolean> queryPair =
queryThreads_.computeIfPresent(queryId,
+ (k, v) -> {
+ // Set cancellation flag and interrupt the thread. Prevents access to
+ // queryPair while interrupting to prevent a race in close() between
remove()
+ // and completing execution of the thread.
+ v.second.set(true);
+ v.first.interrupt();
+ return v;
+ });
+ if (queryPair != null) {
+ LOG.debug(
+ "Cancelled request: thread {} for query {}", queryPair.first,
PrintId(queryId));
+ } else {
LOG.info(
"Unable to cancel request: thread for query {} not found",
PrintId(queryId));
- return;
}
-
- Thread queryThread = queryPair.first;
- LOG.debug(
- "Cancelling request: thread {} for query {}", queryThread,
PrintId(queryId));
- queryPair.second.set(true);
- queryThread.interrupt();
}
/**