This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 12527d7d17fbdf8172e6f14472ddb10f6e7e896c Author: Ali Alsuliman <[email protected]> AuthorDate: Sat Feb 22 14:01:15 2025 -0800 [NO ISSUE][HYR] Keep trying to cancel job tasks - user model changes: no - storage format changes: no - interface changes: no When cancelling a job's tasks by interrupting them, sometimes a task would not respond to the interrupt and continues to work/wait causing the job cancellation to get stuck. Typically, a task should respond to the interrupt, however there have been cases where the interrupt status is cleared due to some ill parts of the task. To account for such cases, keep trying to cancel the tasks to set the interrupt status again. Log warning since this is typically a code issue. Ext-ref: MB-65432 Change-Id: I51fccbceeed0222aeedbaa7b6f138f3ed3e7c44d Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19465 Reviewed-by: Michael Blow <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> --- .../runtime/SuperActivityOperatorNodePushable.java | 36 ++++++++++++++++++++-- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java index 17f5cb1ea0..e031284289 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java @@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.tuple.Pair; @@ -51,12 +52,17 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.util.ExceptionUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * The runtime of a SuperActivity, which internally executes a DAG of one-to-one * connected activities in a single thread. */ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable { + + private static final Logger LOGGER = LogManager.getLogger(); + private static final String CLASS_ABBREVIATION = "SAO"; private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables = new HashMap<>(); private final List<IOperatorNodePushable> operatorNodePushablesBFSOrder = new ArrayList<>(); @@ -264,13 +270,37 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable } private void cancelTasks(List<Future<Void>> tasks, Semaphore startSemaphore, Semaphore completeSemaphore) { + boolean cancelCompleted = false; try { startSemaphore.acquireUninterruptibly(); - for (Future<Void> task : tasks) { - task.cancel(true); + cancelCompleted = cancelTasks(tasks, completeSemaphore); + } finally { + if (!cancelCompleted) { + completeSemaphore.acquireUninterruptibly(); + } + } + } + + private static boolean cancelTasks(List<Future<Void>> tasks, Semaphore completeSemaphore) { + boolean interrupted = Thread.interrupted(); + try { + while (true) { + for (Future<Void> task : tasks) { + task.cancel(true); + } + try { + if (completeSemaphore.tryAcquire(5, TimeUnit.MINUTES)) { + return true; + } + LOGGER.warn("not all tasks were cancelled within 5 minutes. retrying cancelling..."); + } catch (InterruptedException e) { + interrupted = true; + } } } finally { - completeSemaphore.acquireUninterruptibly(); + if (interrupted) { + Thread.currentThread().interrupt(); + } } } }
