This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit f83b503b1858d406634de923f53403b14d18ba74 Author: Michael Blow <[email protected]> AuthorDate: Tue Aug 1 23:03:48 2023 -0400 [NO ISSUE][HYR] Avoid hang in SuperActivityOperatorNodePushable on NC shutdown Change-Id: Ic25d0b8722055c5abbda0de1f5475c14b68ae001 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17687 Integration-Tests: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Michael Blow <[email protected]> --- .../api/rewriter/runtime/SuperActivityOperatorNodePushable.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java index e43d72aaba..10af2fdbd8 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java @@ -30,8 +30,10 @@ import java.util.Map.Entry; import java.util.Queue; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import org.apache.commons.collections4.MapUtils; @@ -240,6 +242,12 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable } } catch (ExecutionException e) { root = e.getCause(); + } catch (CancellationException | RejectedExecutionException e) { + root = e; + // if a task has been cancelled or was rejected for execution, the executor has shut down and will no longer + // start tasks; adjust the semaphores accordingly to allow cancelTasks() to run without getting blocked. + completeSemaphore.release(-startSemaphore.drainPermits() + 1); + startSemaphore.release(); } catch (Throwable e) { // NOSONAR: Must catch all causes of failure root = e; }
