This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 12527d7d17fbdf8172e6f14472ddb10f6e7e896c
Author: Ali Alsuliman <[email protected]>
AuthorDate: Sat Feb 22 14:01:15 2025 -0800

    [NO ISSUE][HYR] Keep trying to cancel job tasks
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    When cancelling a job's tasks by interrupting them, sometimes a task
    would not respond to the interrupt and continues to work/wait causing
    the job cancellation to get stuck. Typically, a task should respond to
    the interrupt, however there have been cases where the interrupt status
    is cleared due to some ill parts of the task. To account for such cases,
    keep trying to cancel the tasks to set the interrupt status again.
    Log warning since this is typically a code issue.
    
    Ext-ref: MB-65432
    
    Change-Id: I51fccbceeed0222aeedbaa7b6f138f3ed3e7c44d
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19465
    Reviewed-by: Michael Blow <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
---
 .../runtime/SuperActivityOperatorNodePushable.java | 36 ++++++++++++++++++++--
 1 file changed, 33 insertions(+), 3 deletions(-)

diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 17f5cb1ea0..e031284289 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -51,12 +52,17 @@ import 
org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 /**
  * The runtime of a SuperActivity, which internally executes a DAG of 
one-to-one
  * connected activities in a single thread.
  */
 public class SuperActivityOperatorNodePushable implements 
IOperatorNodePushable {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
     private static final String CLASS_ABBREVIATION = "SAO";
     private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables 
= new HashMap<>();
     private final List<IOperatorNodePushable> operatorNodePushablesBFSOrder = 
new ArrayList<>();
@@ -264,13 +270,37 @@ public class SuperActivityOperatorNodePushable implements 
IOperatorNodePushable
     }
 
     private void cancelTasks(List<Future<Void>> tasks, Semaphore 
startSemaphore, Semaphore completeSemaphore) {
+        boolean cancelCompleted = false;
         try {
             startSemaphore.acquireUninterruptibly();
-            for (Future<Void> task : tasks) {
-                task.cancel(true);
+            cancelCompleted = cancelTasks(tasks, completeSemaphore);
+        } finally {
+            if (!cancelCompleted) {
+                completeSemaphore.acquireUninterruptibly();
+            }
+        }
+    }
+
+    private static boolean cancelTasks(List<Future<Void>> tasks, Semaphore 
completeSemaphore) {
+        boolean interrupted = Thread.interrupted();
+        try {
+            while (true) {
+                for (Future<Void> task : tasks) {
+                    task.cancel(true);
+                }
+                try {
+                    if (completeSemaphore.tryAcquire(5, TimeUnit.MINUTES)) {
+                        return true;
+                    }
+                    LOGGER.warn("not all tasks were cancelled within 5 
minutes. retrying cancelling...");
+                } catch (InterruptedException e) {
+                    interrupted = true;
+                }
             }
         } finally {
-            completeSemaphore.acquireUninterruptibly();
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
         }
     }
 }

Reply via email to