This is an automated email from the ASF dual-hosted git repository. rzo1 pushed a commit to branch concurency in repository https://gitbox.apache.org/repos/asf/tomee.git
commit d05eb5dd8a73e742fc891ca154423a76ecaa7809 Author: Richard Zowalla <[email protected]> AuthorDate: Thu Apr 2 19:34:16 2026 +0200 Rewrite scheduled async with manual trigger loop and context preservation Two fixes for the last Web-profile TCK failures: 1. Context propagation: when executor is a plain MES (not MSES), extract the MES's ContextServiceImpl and compose a temporary MSES that uses the MES's context service with the default MSES's thread pool. This preserves third-party context propagation (e.g. StringContext). 2. Thread pool starvation: replace mses.schedule(Callable, Trigger) with a manual trigger loop that schedules directly on the delegate ScheduledExecutorService. This avoids TriggerTask's double context wrapping and thread consumption between trigger fires. --- .../cdi/concurrency/AsynchronousInterceptor.java | 182 ++++++++++++++++----- 1 file changed, 144 insertions(+), 38 deletions(-) diff --git a/container/openejb-core/src/main/java/org/apache/openejb/cdi/concurrency/AsynchronousInterceptor.java b/container/openejb-core/src/main/java/org/apache/openejb/cdi/concurrency/AsynchronousInterceptor.java index 01569d9b5d..263e34aaa5 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/cdi/concurrency/AsynchronousInterceptor.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/cdi/concurrency/AsynchronousInterceptor.java @@ -18,8 +18,8 @@ package org.apache.openejb.cdi.concurrency; import jakarta.annotation.Priority; import jakarta.enterprise.concurrent.Asynchronous; +import jakarta.enterprise.concurrent.LastExecution; import jakarta.enterprise.concurrent.ManagedExecutorService; -import jakarta.enterprise.concurrent.ManagedScheduledExecutorService; import jakarta.enterprise.concurrent.Schedule; import jakarta.enterprise.concurrent.ZonedTrigger; import jakarta.interceptor.AroundInvoke; @@ -28,19 +28,27 @@ import jakarta.interceptor.InvocationContext; import org.apache.openejb.core.ivm.naming.NamingException; import org.apache.openejb.resource.thread.ManagedExecutorServiceImplFactory; import org.apache.openejb.resource.thread.ManagedScheduledExecutorServiceImplFactory; +import org.apache.openejb.threads.impl.ContextServiceImpl; +import org.apache.openejb.threads.impl.ManagedExecutorServiceImpl; +import org.apache.openejb.threads.impl.ManagedScheduledExecutorServiceImpl; import org.apache.openejb.util.LogCategory; import org.apache.openejb.util.Logger; import java.lang.annotation.Annotation; import java.lang.reflect.Method; +import java.time.Duration; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.Arrays; +import java.util.Date; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @Interceptor @@ -113,25 +121,14 @@ public class AsynchronousInterceptor { final Schedule[] schedules) throws Exception { // Per spec, the executor attribute may reference either a ManagedScheduledExecutorService // or a plain ManagedExecutorService. When a plain MES is referenced, fall back to the - // default MSES for scheduling capability (the trigger mechanism requires MSES). - ManagedScheduledExecutorService mses; - try { - mses = ManagedScheduledExecutorServiceImplFactory.lookup(asynchronous.executor()); - } catch (final IllegalArgumentException e) { - // The executor might be a plain ManagedExecutorService — verify it exists, - // then use the default MSES for scheduling - try { - ManagedExecutorServiceImplFactory.lookup(asynchronous.executor()); - // MES exists — use default MSES for scheduling - mses = ManagedScheduledExecutorServiceImplFactory.lookup( - "java:comp/DefaultManagedScheduledExecutorService"); - } catch (final Exception fallbackEx) { - throw new RejectedExecutionException("Cannot lookup executor for scheduled async method", e); - } - } + // default MSES for scheduling capability but preserve the MES's context service. + final ManagedScheduledExecutorServiceImpl mses = resolveMses(asynchronous.executor()); final ZonedTrigger trigger = ScheduleHelper.toTrigger(schedules); final boolean isVoid = ctx.getMethod().getReturnType() == Void.TYPE; + final ContextServiceImpl ctxService = (ContextServiceImpl) mses.getContextService(); + final ContextServiceImpl.Snapshot snapshot = ctxService.snapshot(null); + final ScheduledExecutorService delegate = mses.getDelegate(); // A single CompletableFuture represents ALL executions in the schedule. // Per spec: "A single future represents the completion of all executions in the schedule." @@ -141,51 +138,160 @@ public class AsynchronousInterceptor { // - the future is completed (via Asynchronous.Result.complete()) or cancelled final CompletableFuture<Object> outerFuture = mses.newIncompleteFuture(); final AtomicReference<ScheduledFuture<?>> scheduledRef = new AtomicReference<>(); + final AtomicReference<LastExecution> lastExecutionRef = new AtomicReference<>(); + + // Schedule the first execution via the manual trigger loop + scheduleNextExecution(delegate, snapshot, ctxService, trigger, outerFuture, + ctx, isVoid, scheduledRef, lastExecutionRef); + + // Cancel the underlying scheduled task when the future completes externally + // (e.g. Asynchronous.Result.complete() or cancel()) + outerFuture.whenComplete((final Object val, final Throwable err) -> { + final ScheduledFuture<?> sf = scheduledRef.get(); + if (sf != null) { + sf.cancel(false); + } + }); + + return isVoid ? null : outerFuture; + } + + private ManagedScheduledExecutorServiceImpl resolveMses(final String executorName) { + try { + return ManagedScheduledExecutorServiceImplFactory.lookup(executorName); + } catch (final IllegalArgumentException e) { + // The executor might be a plain ManagedExecutorService — verify it exists, + // then use the default MSES for scheduling with the MES's context service + try { + final ManagedExecutorServiceImpl plainMes = ManagedExecutorServiceImplFactory.lookup(executorName); + final ContextServiceImpl mesContextService = (ContextServiceImpl) plainMes.getContextService(); + final ManagedScheduledExecutorServiceImpl defaultMses = + ManagedScheduledExecutorServiceImplFactory.lookup("java:comp/DefaultManagedScheduledExecutorService"); + return new ManagedScheduledExecutorServiceImpl(defaultMses.getDelegate(), mesContextService); + } catch (final Exception fallbackEx) { + throw new RejectedExecutionException("Cannot lookup executor for scheduled async method", e); + } + } + } + + private void scheduleNextExecution(final ScheduledExecutorService delegate, final ContextServiceImpl.Snapshot snapshot, + final ContextServiceImpl ctxService, final ZonedTrigger trigger, + final CompletableFuture<Object> future, final InvocationContext ctx, + final boolean isVoid, final AtomicReference<ScheduledFuture<?>> scheduledRef, + final AtomicReference<LastExecution> lastExecutionRef) { + final ZonedDateTime taskScheduledTime = ZonedDateTime.now(); + final ZonedDateTime nextRun = trigger.getNextRunTime(lastExecutionRef.get(), taskScheduledTime); + if (nextRun == null || future.isDone()) { + return; + } + + final long delayMs = Duration.between(ZonedDateTime.now(), nextRun).toMillis(); + + final ScheduledFuture<?> sf = delegate.schedule(() -> { + if (future.isDone()) { + return; + } - final ScheduledFuture<?> scheduledFuture = mses.schedule((Callable<Object>) () -> { + final ContextServiceImpl.State state = ctxService.enter(snapshot); try { - Asynchronous.Result.setFuture(outerFuture); + if (trigger.skipRun(lastExecutionRef.get(), nextRun)) { + // Skipped — reschedule for the next run + scheduleNextExecution(delegate, snapshot, ctxService, trigger, future, + ctx, isVoid, scheduledRef, lastExecutionRef); + return; + } + + final ZonedDateTime runStart = ZonedDateTime.now(); + Asynchronous.Result.setFuture(future); final Object result = ctx.proceed(); + final ZonedDateTime runEnd = ZonedDateTime.now(); + + // Track last execution for trigger computation + lastExecutionRef.set(new SimpleLastExecution(taskScheduledTime, runStart, runEnd, result)); if (isVoid) { Asynchronous.Result.setFuture(null); - return null; + scheduleNextExecution(delegate, snapshot, ctxService, trigger, future, + ctx, isVoid, scheduledRef, lastExecutionRef); + return; } // Per spec: non-null return value stops the schedule if (result != null) { - if (result instanceof CompletionStage<?> cs && result != outerFuture) { - cs.whenComplete((val, err) -> { + if (result instanceof CompletionStage<?> cs && result != future) { + cs.whenComplete((final Object val, final Throwable err) -> { if (err != null) { - outerFuture.completeExceptionally(err); + future.completeExceptionally(err); } else { - outerFuture.complete(val); + future.complete(val); } - Asynchronous.Result.setFuture(null); }); } Asynchronous.Result.setFuture(null); - - // Cancel the trigger loop — method returned non-null - final ScheduledFuture<?> sf = scheduledRef.get(); - if (sf != null) { - sf.cancel(false); - } + // Don't reschedule — method returned non-null + return; } + + Asynchronous.Result.setFuture(null); // null return: schedule continues + scheduleNextExecution(delegate, snapshot, ctxService, trigger, future, + ctx, isVoid, scheduledRef, lastExecutionRef); } catch (final Exception e) { - outerFuture.completeExceptionally(e); + future.completeExceptionally(e); Asynchronous.Result.setFuture(null); + } finally { + ctxService.exit(state); } + }, Math.max(0, delayMs), TimeUnit.MILLISECONDS); + + scheduledRef.set(sf); + } + + /** + * Simple {@link LastExecution} implementation for tracking execution history + * within the manual trigger loop. + */ + private record SimpleLastExecution(ZonedDateTime scheduledStart, ZonedDateTime runStart, + ZonedDateTime runEnd, Object result) implements LastExecution { + @Override + public String getIdentityName() { return null; - }, trigger); + } - scheduledRef.set(scheduledFuture); + @Override + public Object getResult() { + return result; + } - // Also cancel when the future completes externally (e.g. Asynchronous.Result.complete()) - outerFuture.whenComplete((val, err) -> scheduledFuture.cancel(false)); + @Override + public Date getScheduledStart() { + return Date.from(scheduledStart.toInstant()); + } - return isVoid ? null : outerFuture; + @Override + public ZonedDateTime getScheduledStart(final ZoneId zone) { + return scheduledStart.withZoneSameInstant(zone); + } + + @Override + public Date getRunStart() { + return Date.from(runStart.toInstant()); + } + + @Override + public ZonedDateTime getRunStart(final ZoneId zone) { + return runStart.withZoneSameInstant(zone); + } + + @Override + public Date getRunEnd() { + return Date.from(runEnd.toInstant()); + } + + @Override + public ZonedDateTime getRunEnd(final ZoneId zone) { + return runEnd.withZoneSameInstant(zone); + } } private Exception validate(final Method method) {
