This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit cd2ae755a918d8de0c737850b4d63d9281af3b87
Author: Ali Alsuliman <[email protected]>
AuthorDate: Sat Feb 22 22:39:00 2025 -0800

    [NO ISSUE][HYR] Retry cancelling tasks only after the timeout
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Instead of retrying the tasks cancellation on both
    the 5 minutes timeout or interrupt, retry only after the timeout
    to avoid retrying with every interrupt that can happen to
    the Super Activity.
    
    Ext-ref: MB-65432
    
    Change-Id: Ie585127fe30904f5126bae8867b94ea12cd45762
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19466
    Reviewed-by: Ali Alsuliman <[email protected]>
    Reviewed-by: Michael Blow <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
---
 .../runtime/SuperActivityOperatorNodePushable.java | 38 ++++++++++++----------
 1 file changed, 21 insertions(+), 17 deletions(-)

diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index e031284289..a4dd9db360 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -52,6 +52,8 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.hyracks.util.Span;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -282,25 +284,27 @@ public class SuperActivityOperatorNodePushable implements 
IOperatorNodePushable
     }
 
     private static boolean cancelTasks(List<Future<Void>> tasks, Semaphore 
completeSemaphore) {
-        boolean interrupted = Thread.interrupted();
-        try {
-            while (true) {
-                for (Future<Void> task : tasks) {
-                    task.cancel(true);
-                }
-                try {
-                    if (completeSemaphore.tryAcquire(5, TimeUnit.MINUTES)) {
-                        return true;
-                    }
-                    LOGGER.warn("not all tasks were cancelled within 5 
minutes. retrying cancelling...");
-                } catch (InterruptedException e) {
-                    interrupted = true;
-                }
+        Span retryWait = Span.init(5, TimeUnit.MINUTES);
+        while (true) {
+            for (Future<Void> task : tasks) {
+                task.cancel(true);
             }
-        } finally {
-            if (interrupted) {
-                Thread.currentThread().interrupt();
+            if (acquireUninterruptibly(completeSemaphore, retryWait)) {
+                return true;
             }
+            LOGGER.warn("not all tasks were cancelled within 5 minutes. 
retrying cancelling...");
         }
     }
+
+    private static boolean acquireUninterruptibly(Semaphore completeSemaphore, 
Span s) {
+        s.reset();
+        return InvokeUtil.getUninterruptibly(() -> {
+            while (!s.elapsed()) {
+                if 
(completeSemaphore.tryAcquire(s.remaining(TimeUnit.NANOSECONDS), 
TimeUnit.NANOSECONDS)) {
+                    return true;
+                }
+            }
+            return false;
+        });
+    }
 }

Reply via email to