This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit cd2ae755a918d8de0c737850b4d63d9281af3b87 Author: Ali Alsuliman <[email protected]> AuthorDate: Sat Feb 22 22:39:00 2025 -0800 [NO ISSUE][HYR] Retry cancelling tasks only after the timeout - user model changes: no - storage format changes: no - interface changes: no Instead of retrying the tasks cancellation on both the 5 minutes timeout or interrupt, retry only after the timeout to avoid retrying with every interrupt that can happen to the Super Activity. Ext-ref: MB-65432 Change-Id: Ie585127fe30904f5126bae8867b94ea12cd45762 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19466 Reviewed-by: Ali Alsuliman <[email protected]> Reviewed-by: Michael Blow <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> --- .../runtime/SuperActivityOperatorNodePushable.java | 38 ++++++++++++---------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java index e031284289..a4dd9db360 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java @@ -52,6 +52,8 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.util.ExceptionUtils; +import org.apache.hyracks.api.util.InvokeUtil; +import org.apache.hyracks.util.Span; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -282,25 +284,27 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable } private static boolean cancelTasks(List<Future<Void>> tasks, Semaphore completeSemaphore) { - boolean interrupted = Thread.interrupted(); - try { - while (true) { - for (Future<Void> task : tasks) { - task.cancel(true); - } - try { - if (completeSemaphore.tryAcquire(5, TimeUnit.MINUTES)) { - return true; - } - LOGGER.warn("not all tasks were cancelled within 5 minutes. retrying cancelling..."); - } catch (InterruptedException e) { - interrupted = true; - } + Span retryWait = Span.init(5, TimeUnit.MINUTES); + while (true) { + for (Future<Void> task : tasks) { + task.cancel(true); } - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); + if (acquireUninterruptibly(completeSemaphore, retryWait)) { + return true; } + LOGGER.warn("not all tasks were cancelled within 5 minutes. retrying cancelling..."); } } + + private static boolean acquireUninterruptibly(Semaphore completeSemaphore, Span s) { + s.reset(); + return InvokeUtil.getUninterruptibly(() -> { + while (!s.elapsed()) { + if (completeSemaphore.tryAcquire(s.remaining(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS)) { + return true; + } + } + return false; + }); + } }
