This is an automated email from the ASF dual-hosted git repository. rzo1 pushed a commit to branch concurency in repository https://gitbox.apache.org/repos/asf/tomee.git
commit e9c554d6a9381722d6c6266873e4268ecd29d15a Author: Richard Zowalla <[email protected]> AuthorDate: Thu Apr 2 10:06:55 2026 +0200 Implement @Asynchronous(runAt=@Schedule(...)) for Jakarta Concurrency 3.1 Add support for scheduled recurring async methods as defined in Jakarta Concurrency 3.1. This addresses the largest block of TCK failures (28 tests) by enabling cron-based scheduling of CDI @Asynchronous methods via the new runAt attribute. - ScheduleHelper: maps @Schedule annotations to API-provided CronTrigger, supports composite triggers (multiple schedules) and skipIfLateBy wrapping - AsynchronousInterceptor: branches on runAt presence — one-shot path unchanged, new scheduled path uses ManagedScheduledExecutorService - ManagedScheduledExecutorServiceImplFactory: adds lookup() with graceful fallback matching the ManagedExecutorServiceImplFactory pattern --- .gitignore | 6 +- .../concurrency/ScheduledAsynchronousTest.java | 95 ++++++++ .../cdi/concurrency/AsynchronousInterceptor.java | 94 +++++++- .../openejb/cdi/concurrency/ScheduleHelper.java | 191 +++++++++++++++ ...ManagedScheduledExecutorServiceImplFactory.java | 46 ++++ .../cdi/concurrency/AsynchronousScheduledTest.java | 101 ++++++++ .../cdi/concurrency/ScheduleHelperTest.java | 258 +++++++++++++++++++++ 7 files changed, 780 insertions(+), 11 deletions(-) diff --git a/.gitignore b/.gitignore index 76c4985205..4bf34949bc 100644 --- a/.gitignore +++ b/.gitignore @@ -21,4 +21,8 @@ tck/**/temp examples/jaxrs-json-provider-jettison/temp/ transformer/jakartaee-prototype/ transformer/transformer-0.1.0-SNAPSHOT/ -*.zip \ No newline at end of file +*.zip + +CLAUDE.md +.claude +tck-dev \ No newline at end of file diff --git a/arquillian/arquillian-tomee-tests/arquillian-tomee-webprofile-tests/src/test/java/org/apache/openejb/arquillian/tests/concurrency/ScheduledAsynchronousTest.java b/arquillian/arquillian-tomee-tests/arquillian-tomee-webprofile-tests/src/test/java/org/apache/openejb/arquillian/tests/concurrency/ScheduledAsynchronousTest.java new file mode 100644 index 0000000000..0466b9a5bb --- /dev/null +++ b/arquillian/arquillian-tomee-tests/arquillian-tomee-webprofile-tests/src/test/java/org/apache/openejb/arquillian/tests/concurrency/ScheduledAsynchronousTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.openejb.arquillian.tests.concurrency; + +import jakarta.enterprise.concurrent.Asynchronous; +import jakarta.enterprise.concurrent.Schedule; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.shrinkwrap.api.ArchivePaths; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.asset.EmptyAsset; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertTrue; + +/** + * Arquillian integration test for {@code @Asynchronous(runAt = @Schedule(...))} + * — the scheduled recurring async method feature introduced in Jakarta Concurrency 3.1. + */ +@RunWith(Arquillian.class) +public class ScheduledAsynchronousTest { + + @Inject + private ScheduledBean scheduledBean; + + @Deployment + public static WebArchive createDeployment() { + return ShrinkWrap.create(WebArchive.class, "ScheduledAsynchronousTest.war") + .addClasses(ScheduledBean.class) + .addAsWebInfResource(EmptyAsset.INSTANCE, ArchivePaths.create("beans.xml")); + } + + @Test + public void scheduledVoidMethodExecutesRepeatedly() throws Exception { + scheduledBean.everySecondVoid(); + + final boolean reached = ScheduledBean.VOID_LATCH.await(10, TimeUnit.SECONDS); + assertTrue("Scheduled void method should have been invoked at least 3 times, count: " + + ScheduledBean.VOID_COUNTER.get(), reached); + } + + @Test + public void scheduledReturningMethodExecutes() throws Exception { + final CompletableFuture<String> future = scheduledBean.everySecondReturning(); + + final boolean reached = ScheduledBean.RETURNING_LATCH.await(10, TimeUnit.SECONDS); + assertTrue("Scheduled returning method should have been invoked, count: " + + ScheduledBean.RETURNING_COUNTER.get(), reached); + } + + @ApplicationScoped + public static class ScheduledBean { + static final AtomicInteger VOID_COUNTER = new AtomicInteger(); + static final CountDownLatch VOID_LATCH = new CountDownLatch(3); + + static final AtomicInteger RETURNING_COUNTER = new AtomicInteger(); + static final CountDownLatch RETURNING_LATCH = new CountDownLatch(1); + + @Asynchronous(runAt = @Schedule(cron = "* * * * * *")) + public void everySecondVoid() { + VOID_COUNTER.incrementAndGet(); + VOID_LATCH.countDown(); + } + + @Asynchronous(runAt = @Schedule(cron = "* * * * * *")) + public CompletableFuture<String> everySecondReturning() { + RETURNING_COUNTER.incrementAndGet(); + RETURNING_LATCH.countDown(); + return Asynchronous.Result.complete("done"); + } + } +} diff --git a/container/openejb-core/src/main/java/org/apache/openejb/cdi/concurrency/AsynchronousInterceptor.java b/container/openejb-core/src/main/java/org/apache/openejb/cdi/concurrency/AsynchronousInterceptor.java index 04f6c46773..d896e9eaf2 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/cdi/concurrency/AsynchronousInterceptor.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/cdi/concurrency/AsynchronousInterceptor.java @@ -19,16 +19,23 @@ package org.apache.openejb.cdi.concurrency; import jakarta.annotation.Priority; import jakarta.enterprise.concurrent.Asynchronous; import jakarta.enterprise.concurrent.ManagedExecutorService; +import jakarta.enterprise.concurrent.ManagedScheduledExecutorService; +import jakarta.enterprise.concurrent.Schedule; +import jakarta.enterprise.concurrent.ZonedTrigger; import jakarta.interceptor.AroundInvoke; import jakarta.interceptor.Interceptor; import jakarta.interceptor.InvocationContext; import org.apache.openejb.core.ivm.naming.NamingException; import org.apache.openejb.resource.thread.ManagedExecutorServiceImplFactory; +import org.apache.openejb.resource.thread.ManagedScheduledExecutorServiceImplFactory; +import org.apache.openejb.util.LogCategory; +import org.apache.openejb.util.Logger; import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.util.Arrays; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; @@ -38,6 +45,8 @@ import java.util.concurrent.RejectedExecutionException; @Asynchronous @Priority(Interceptor.Priority.PLATFORM_BEFORE + 5) public class AsynchronousInterceptor { + private static final Logger LOGGER = Logger.getInstance(LogCategory.OPENEJB, AsynchronousInterceptor.class); + public static final String MP_ASYNC_ANNOTATION_NAME = "org.eclipse.microprofile.faulttolerance.Asynchronous"; // ensure validation logic required by the spec only runs once per invoked Method @@ -45,24 +54,34 @@ public class AsynchronousInterceptor { @AroundInvoke public Object aroundInvoke(final InvocationContext ctx) throws Exception { - Exception exception = validationCache.computeIfAbsent(ctx.getMethod(), this::validate); + final Exception exception = validationCache.computeIfAbsent(ctx.getMethod(), this::validate); if (exception != null) { throw exception; } - Asynchronous asynchronous = ctx.getMethod().getAnnotation(Asynchronous.class); - ManagedExecutorService mes; + final Asynchronous asynchronous = ctx.getMethod().getAnnotation(Asynchronous.class); + final Schedule[] schedules = asynchronous.runAt(); + + if (schedules.length > 0) { + return aroundInvokeScheduled(ctx, asynchronous, schedules); + } + + return aroundInvokeOneShot(ctx, asynchronous); + } + + private Object aroundInvokeOneShot(final InvocationContext ctx, final Asynchronous asynchronous) throws Exception { + final ManagedExecutorService mes; try { mes = ManagedExecutorServiceImplFactory.lookup(asynchronous.executor()); - } catch (NamingException | IllegalArgumentException e) { + } catch (final NamingException | IllegalArgumentException e) { throw new RejectedExecutionException("Cannot lookup ManagedExecutorService", e); } - CompletableFuture<Object> future = mes.newIncompleteFuture(); + final CompletableFuture<Object> future = mes.newIncompleteFuture(); mes.execute(() -> { try { Asynchronous.Result.setFuture(future); - CompletionStage<?> result = (CompletionStage<?>) ctx.proceed(); + final CompletionStage<?> result = (CompletionStage<?>) ctx.proceed(); if (result == null || result == future) { future.complete(result); @@ -79,7 +98,7 @@ public class AsynchronousInterceptor { Asynchronous.Result.setFuture(null); }); - } catch (Exception e) { + } catch (final Exception e) { future.completeExceptionally(e); Asynchronous.Result.setFuture(null); } @@ -88,18 +107,73 @@ public class AsynchronousInterceptor { return ctx.getMethod().getReturnType() == Void.TYPE ? null : future; } + private Object aroundInvokeScheduled(final InvocationContext ctx, final Asynchronous asynchronous, + final Schedule[] schedules) throws Exception { + final ManagedScheduledExecutorService mses; + try { + mses = ManagedScheduledExecutorServiceImplFactory.lookup(asynchronous.executor()); + } catch (final IllegalArgumentException e) { + throw new RejectedExecutionException("Cannot lookup ManagedScheduledExecutorService", e); + } + + final ZonedTrigger trigger = ScheduleHelper.toTrigger(schedules); + final boolean isVoid = ctx.getMethod().getReturnType() == Void.TYPE; + + if (isVoid) { + // void method: schedule as Runnable, runs indefinitely until cancelled + mses.schedule((Runnable) () -> { + try { + ctx.proceed(); + } catch (final Exception e) { + LOGGER.warning("Scheduled async method threw exception", e); + } + }, trigger); + return null; + } + + // non-void: schedule as Callable, each invocation gets a fresh future via Asynchronous.Result + final CompletableFuture<Object> outerFuture = mses.newIncompleteFuture(); + + mses.schedule((Callable<Object>) () -> { + try { + Asynchronous.Result.setFuture(outerFuture); + final Object result = ctx.proceed(); + + if (result instanceof CompletionStage<?> cs) { + cs.whenComplete((val, err) -> { + if (err != null) { + outerFuture.completeExceptionally(err); + } else if (val != null) { + outerFuture.complete(val); + } + Asynchronous.Result.setFuture(null); + }); + } else if (result != null && result != outerFuture) { + outerFuture.complete(result); + Asynchronous.Result.setFuture(null); + } + } catch (final Exception e) { + outerFuture.completeExceptionally(e); + Asynchronous.Result.setFuture(null); + } + return null; + }, trigger); + + return outerFuture; + } + private Exception validate(final Method method) { if (hasMpAsyncAnnotation(method.getAnnotations()) || hasMpAsyncAnnotation(method.getDeclaringClass().getAnnotations())) { return new UnsupportedOperationException("Combining " + Asynchronous.class.getName() + " and " + MP_ASYNC_ANNOTATION_NAME + " on the same method/class is not supported"); } - Asynchronous asynchronous = method.getAnnotation(Asynchronous.class); + final Asynchronous asynchronous = method.getAnnotation(Asynchronous.class); if (asynchronous == null) { return new UnsupportedOperationException("Asynchronous annotation must be placed on a method"); } - Class<?> returnType = method.getReturnType(); + final Class<?> returnType = method.getReturnType(); if (returnType != Void.TYPE && returnType != CompletableFuture.class && returnType != CompletionStage.class) { return new UnsupportedOperationException("Asynchronous annotation must be placed on a method that returns either void, CompletableFuture or CompletionStage"); } @@ -107,7 +181,7 @@ public class AsynchronousInterceptor { return null; } - private boolean hasMpAsyncAnnotation(Annotation[] declaredAnnotations) { + private boolean hasMpAsyncAnnotation(final Annotation[] declaredAnnotations) { return Arrays.stream(declaredAnnotations) .map(it -> it.annotationType().getName()) .anyMatch(it -> it.equals(MP_ASYNC_ANNOTATION_NAME)); diff --git a/container/openejb-core/src/main/java/org/apache/openejb/cdi/concurrency/ScheduleHelper.java b/container/openejb-core/src/main/java/org/apache/openejb/cdi/concurrency/ScheduleHelper.java new file mode 100644 index 0000000000..f1752ca290 --- /dev/null +++ b/container/openejb-core/src/main/java/org/apache/openejb/cdi/concurrency/ScheduleHelper.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.openejb.cdi.concurrency; + +import jakarta.enterprise.concurrent.CronTrigger; +import jakarta.enterprise.concurrent.LastExecution; +import jakarta.enterprise.concurrent.Schedule; +import jakarta.enterprise.concurrent.ZonedTrigger; + +import java.time.DayOfWeek; +import java.time.Month; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Arrays; + +/** + * Maps {@link Schedule} annotations to the API-provided {@link CronTrigger}. + * Similar design pattern to {@link org.apache.openejb.core.timer.EJBCronTrigger} + * which maps EJB {@code @Schedule} to Quartz triggers. Here the API JAR provides + * the cron parsing — we just bridge the annotation attributes. + */ +public final class ScheduleHelper { + + private ScheduleHelper() { + // utility + } + + /** + * Converts a single {@link Schedule} annotation to a {@link CronTrigger}. + * If {@link Schedule#cron()} is non-empty, uses the cron expression directly. + * Otherwise builds the trigger from individual field attributes. + */ + public static CronTrigger toCronTrigger(final Schedule schedule) { + final ZoneId zone = schedule.zone().isEmpty() + ? ZoneId.systemDefault() + : ZoneId.of(schedule.zone()); + + final String cron = schedule.cron(); + if (!cron.isEmpty()) { + return new CronTrigger(cron, zone); + } + + final CronTrigger trigger = new CronTrigger(zone); + + if (schedule.months().length > 0) { + trigger.months(toMonths(schedule.months())); + } + if (schedule.daysOfMonth().length > 0) { + trigger.daysOfMonth(schedule.daysOfMonth()); + } + if (schedule.daysOfWeek().length > 0) { + trigger.daysOfWeek(toDaysOfWeek(schedule.daysOfWeek())); + } + if (schedule.hours().length > 0) { + trigger.hours(schedule.hours()); + } + if (schedule.minutes().length > 0) { + trigger.minutes(schedule.minutes()); + } + if (schedule.seconds().length > 0) { + trigger.seconds(schedule.seconds()); + } + + return trigger; + } + + /** + * Converts one or more {@link Schedule} annotations to a {@link ZonedTrigger}. + * A single schedule returns a potentially wrapped {@link CronTrigger}. + * Multiple schedules return a {@link CompositeScheduleTrigger} that picks the + * earliest next run time. + * + * <p>The returned trigger includes {@code skipIfLateBy} logic when configured.</p> + */ + public static ZonedTrigger toTrigger(final Schedule[] schedules) { + if (schedules.length == 1) { + return wrapWithSkipIfLate(toCronTrigger(schedules[0]), schedules[0].skipIfLateBy()); + } + + final ZonedTrigger[] triggers = new ZonedTrigger[schedules.length]; + for (int i = 0; i < schedules.length; i++) { + triggers[i] = wrapWithSkipIfLate(toCronTrigger(schedules[i]), schedules[i].skipIfLateBy()); + } + return new CompositeScheduleTrigger(triggers); + } + + private static ZonedTrigger wrapWithSkipIfLate(final CronTrigger trigger, final long skipIfLateBy) { + if (skipIfLateBy <= 0) { + return trigger; + } + return new SkipIfLateTrigger(trigger, skipIfLateBy); + } + + private static Month[] toMonths(final Month[] months) { + return months; + } + + private static DayOfWeek[] toDaysOfWeek(final DayOfWeek[] days) { + return days; + } + + /** + * Wraps a {@link ZonedTrigger} to skip executions that are late by more than + * the configured threshold (in seconds). Per the spec, the default is 600 seconds. + */ + static class SkipIfLateTrigger implements ZonedTrigger { + private final ZonedTrigger delegate; + private final long skipIfLateBySeconds; + + SkipIfLateTrigger(final ZonedTrigger delegate, final long skipIfLateBySeconds) { + this.delegate = delegate; + this.skipIfLateBySeconds = skipIfLateBySeconds; + } + + @Override + public ZonedDateTime getNextRunTime(final LastExecution lastExecution, final ZonedDateTime taskScheduledTime) { + return delegate.getNextRunTime(lastExecution, taskScheduledTime); + } + + @Override + public ZoneId getZoneId() { + return delegate.getZoneId(); + } + + @Override + public boolean skipRun(final LastExecution lastExecution, final ZonedDateTime scheduledRunTime) { + if (delegate.skipRun(lastExecution, scheduledRunTime)) { + return true; + } + + final ZonedDateTime now = ZonedDateTime.now(getZoneId()); + final long lateBySeconds = java.time.Duration.between(scheduledRunTime, now).getSeconds(); + return lateBySeconds > skipIfLateBySeconds; + } + } + + /** + * Combines multiple {@link ZonedTrigger} instances, picking the earliest + * next run time from all delegates. Used when multiple {@link Schedule} + * annotations are present on a single method. + */ + static class CompositeScheduleTrigger implements ZonedTrigger { + private final ZonedTrigger[] delegates; + + CompositeScheduleTrigger(final ZonedTrigger[] delegates) { + this.delegates = Arrays.copyOf(delegates, delegates.length); + } + + @Override + public ZonedDateTime getNextRunTime(final LastExecution lastExecution, final ZonedDateTime taskScheduledTime) { + ZonedDateTime earliest = null; + for (final ZonedTrigger delegate : delegates) { + final ZonedDateTime next = delegate.getNextRunTime(lastExecution, taskScheduledTime); + if (next != null && (earliest == null || next.isBefore(earliest))) { + earliest = next; + } + } + return earliest; + } + + @Override + public ZoneId getZoneId() { + return delegates[0].getZoneId(); + } + + @Override + public boolean skipRun(final LastExecution lastExecution, final ZonedDateTime scheduledRunTime) { + // skip only if ALL delegates would skip + for (final ZonedTrigger delegate : delegates) { + if (!delegate.skipRun(lastExecution, scheduledRunTime)) { + return false; + } + } + return true; + } + } +} diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/thread/ManagedScheduledExecutorServiceImplFactory.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/thread/ManagedScheduledExecutorServiceImplFactory.java index 771b160e20..4e91b81a07 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/resource/thread/ManagedScheduledExecutorServiceImplFactory.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/thread/ManagedScheduledExecutorServiceImplFactory.java @@ -16,6 +16,8 @@ */ package org.apache.openejb.resource.thread; +import org.apache.openejb.loader.SystemInstance; +import org.apache.openejb.spi.ContainerSystem; import org.apache.openejb.threads.impl.ContextServiceImpl; import org.apache.openejb.threads.impl.ContextServiceImplFactory; import org.apache.openejb.threads.impl.ManagedScheduledExecutorServiceImpl; @@ -24,12 +26,56 @@ import org.apache.openejb.threads.reject.CURejectHandler; import org.apache.openejb.util.LogCategory; import org.apache.openejb.util.Logger; +import jakarta.enterprise.concurrent.ManagedScheduledExecutorService; import jakarta.enterprise.concurrent.ManagedThreadFactory; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; public class ManagedScheduledExecutorServiceImplFactory { + + private static final Logger LOGGER = Logger.getInstance(LogCategory.OPENEJB, ManagedScheduledExecutorServiceImplFactory.class); + + public static ManagedScheduledExecutorServiceImpl lookup(String name) { + // If the caller passes the default ManagedExecutorService JNDI name, map it to the + // default ManagedScheduledExecutorService instead + if ("java:comp/DefaultManagedExecutorService".equals(name)) { + name = "java:comp/DefaultManagedScheduledExecutorService"; + } + + // Try direct JNDI lookup first + try { + final Object obj = InitialContext.doLookup(name); + if (obj instanceof ManagedScheduledExecutorServiceImpl mses) { + return mses; + } + } catch (final NamingException ignored) { + // fall through to container JNDI + } + + // Try container JNDI with resource ID + try { + final Context ctx = SystemInstance.get().getComponent(ContainerSystem.class).getJNDIContext(); + final String resourceId = "java:comp/DefaultManagedScheduledExecutorService".equals(name) + ? "Default Scheduled Executor Service" + : name; + + final Object obj = ctx.lookup("openejb/Resource/" + resourceId); + if (obj instanceof ManagedScheduledExecutorServiceImpl mses) { + return mses; + } + } catch (final NamingException ignored) { + // fall through to default creation + } + + // Graceful fallback: create a default instance + LOGGER.debug("Cannot lookup ManagedScheduledExecutorService '" + name + "', creating default instance"); + return new ManagedScheduledExecutorServiceImplFactory().create(); + } + private int core = 5; private String threadFactory = ManagedThreadFactoryImpl.class.getName(); diff --git a/container/openejb-core/src/test/java/org/apache/openejb/cdi/concurrency/AsynchronousScheduledTest.java b/container/openejb-core/src/test/java/org/apache/openejb/cdi/concurrency/AsynchronousScheduledTest.java new file mode 100644 index 0000000000..5e9c25cb8a --- /dev/null +++ b/container/openejb-core/src/test/java/org/apache/openejb/cdi/concurrency/AsynchronousScheduledTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.openejb.cdi.concurrency; + +import jakarta.enterprise.concurrent.Asynchronous; +import jakarta.enterprise.concurrent.Schedule; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.apache.openejb.jee.EnterpriseBean; +import org.apache.openejb.jee.SingletonBean; +import org.apache.openejb.junit.ApplicationComposer; +import org.apache.openejb.testing.Module; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertTrue; + +@RunWith(ApplicationComposer.class) +public class AsynchronousScheduledTest { + + @Inject + private ScheduledBean scheduledBean; + + @Module + public EnterpriseBean ejb() { + // Dummy EJB to trigger full resource deployment including default concurrency resources + return new SingletonBean(DummyEjb.class).localBean(); + } + + @Module + public Class<?>[] beans() { + return new Class<?>[]{ScheduledBean.class}; + } + + @Test + public void scheduledVoidMethodExecutesRepeatedly() throws Exception { + // Call the method once — the interceptor sets up the recurring schedule + scheduledBean.everySecondVoid(); + + // Wait for at least 3 invocations + final boolean reached = ScheduledBean.VOID_LATCH.await(10, TimeUnit.SECONDS); + assertTrue("Scheduled void method should have been invoked at least 3 times, count: " + + ScheduledBean.VOID_COUNTER.get(), reached); + } + + @Test + public void scheduledReturningMethodExecutes() throws Exception { + // Call the method once — the interceptor sets up the recurring schedule + final CompletableFuture<String> future = scheduledBean.everySecondReturning(); + + // Wait for at least 1 invocation + final boolean reached = ScheduledBean.RETURNING_LATCH.await(10, TimeUnit.SECONDS); + assertTrue("Scheduled returning method should have been invoked, count: " + + ScheduledBean.RETURNING_COUNTER.get(), reached); + } + + @ApplicationScoped + public static class ScheduledBean { + static final AtomicInteger VOID_COUNTER = new AtomicInteger(); + static final CountDownLatch VOID_LATCH = new CountDownLatch(3); + + static final AtomicInteger RETURNING_COUNTER = new AtomicInteger(); + static final CountDownLatch RETURNING_LATCH = new CountDownLatch(1); + + @Asynchronous(runAt = @Schedule(cron = "* * * * * *")) + public void everySecondVoid() { + VOID_COUNTER.incrementAndGet(); + VOID_LATCH.countDown(); + } + + @Asynchronous(runAt = @Schedule(cron = "* * * * * *")) + public CompletableFuture<String> everySecondReturning() { + RETURNING_COUNTER.incrementAndGet(); + RETURNING_LATCH.countDown(); + return Asynchronous.Result.complete("done"); + } + } + + @jakarta.ejb.Singleton + public static class DummyEjb { + } +} diff --git a/container/openejb-core/src/test/java/org/apache/openejb/cdi/concurrency/ScheduleHelperTest.java b/container/openejb-core/src/test/java/org/apache/openejb/cdi/concurrency/ScheduleHelperTest.java new file mode 100644 index 0000000000..05930bbf86 --- /dev/null +++ b/container/openejb-core/src/test/java/org/apache/openejb/cdi/concurrency/ScheduleHelperTest.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.openejb.cdi.concurrency; + +import jakarta.enterprise.concurrent.CronTrigger; +import jakarta.enterprise.concurrent.LastExecution; +import jakarta.enterprise.concurrent.Schedule; +import jakarta.enterprise.concurrent.ZonedTrigger; +import org.junit.Test; + +import java.lang.annotation.Annotation; +import java.time.DayOfWeek; +import java.time.Month; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class ScheduleHelperTest { + + @Test + public void cronExpressionTrigger() { + final Schedule schedule = scheduleWithCron("* * * * *", ""); + final CronTrigger trigger = ScheduleHelper.toCronTrigger(schedule); + + assertNotNull(trigger); + final ZonedDateTime next = trigger.getNextRunTime(null, ZonedDateTime.now()); + assertNotNull("CronTrigger should compute a next run time", next); + assertTrue("Next run time should be in the future or now", + !next.isBefore(ZonedDateTime.now().minusSeconds(1))); + } + + @Test + public void cronExpressionWithZone() { + final Schedule schedule = scheduleWithCron("0 12 * * MON-FRI", "America/New_York"); + final CronTrigger trigger = ScheduleHelper.toCronTrigger(schedule); + + assertNotNull(trigger); + assertNotNull(trigger.getZoneId()); + } + + @Test + public void builderStyleTrigger() { + final Schedule schedule = scheduleWithFields( + new Month[]{}, new int[]{}, new DayOfWeek[]{}, + new int[]{}, new int[]{0}, new int[]{0}, + "", 600 + ); + final CronTrigger trigger = ScheduleHelper.toCronTrigger(schedule); + + assertNotNull(trigger); + final ZonedDateTime next = trigger.getNextRunTime(null, ZonedDateTime.now()); + assertNotNull("Builder-style trigger should compute a next run time", next); + } + + @Test + public void singleScheduleToTrigger() { + final Schedule schedule = scheduleWithCron("* * * * *", ""); + final ZonedTrigger trigger = ScheduleHelper.toTrigger(new Schedule[]{schedule}); + + assertNotNull(trigger); + final ZonedDateTime next = trigger.getNextRunTime(null, ZonedDateTime.now()); + assertNotNull(next); + } + + @Test + public void compositeSchedulePicksEarliest() { + // every minute vs every hour — composite should pick the every-minute one + final Schedule everyMinute = scheduleWithCron("* * * * *", ""); + final Schedule everyHour = scheduleWithCron("0 * * * *", ""); + + final ZonedTrigger trigger = ScheduleHelper.toTrigger(new Schedule[]{everyMinute, everyHour}); + assertNotNull(trigger); + + final ZonedDateTime next = trigger.getNextRunTime(null, ZonedDateTime.now()); + assertNotNull("Composite trigger should return a next run time", next); + + // the composite should return the nearest time (every minute) + final ZonedDateTime everyMinuteNext = new CronTrigger("* * * * *", ZoneId.systemDefault()) + .getNextRunTime(null, ZonedDateTime.now()); + assertTrue("Composite should pick the earlier schedule", + !next.isAfter(everyMinuteNext.plusSeconds(1))); + } + + @Test + public void skipIfLateBySkipsLateExecution() { + final Schedule schedule = scheduleWithCron("* * * * *", "", 1); // 1 second threshold + final ZonedTrigger trigger = ScheduleHelper.toTrigger(new Schedule[]{schedule}); + + // Simulate a scheduled run time that was 10 seconds ago + final ZonedDateTime pastScheduledTime = ZonedDateTime.now().minusSeconds(10); + final boolean shouldSkip = trigger.skipRun(null, pastScheduledTime); + assertTrue("Should skip execution that is late by more than threshold", shouldSkip); + } + + @Test + public void skipIfLateByAllowsOnTimeExecution() { + final Schedule schedule = scheduleWithCron("* * * * *", "", 600); // 600 second threshold + final ZonedTrigger trigger = ScheduleHelper.toTrigger(new Schedule[]{schedule}); + + // Simulate a scheduled run time that is now + final ZonedDateTime now = ZonedDateTime.now(); + final boolean shouldSkip = trigger.skipRun(null, now); + assertFalse("Should not skip execution that is on time", shouldSkip); + } + + @Test + public void zeroSkipIfLateByReturnsUnwrappedTrigger() { + final Schedule schedule = scheduleWithCron("* * * * *", "", 0); + final ZonedTrigger trigger = ScheduleHelper.toTrigger(new Schedule[]{schedule}); + + // With skipIfLateBy=0, should get a plain CronTrigger (no wrapping) + assertTrue("Zero skipIfLateBy should return CronTrigger directly", + trigger instanceof CronTrigger); + } + + @Test + public void defaultZoneUsedWhenEmpty() { + final Schedule schedule = scheduleWithCron("* * * * *", ""); + final CronTrigger trigger = ScheduleHelper.toCronTrigger(schedule); + + assertNotNull(trigger.getZoneId()); + } + + // --- Annotation stubs --- + + private static Schedule scheduleWithCron(final String cron, final String zone) { + return scheduleWithCron(cron, zone, 600); + } + + private static Schedule scheduleWithCron(final String cron, final String zone, final long skipIfLateBy) { + return new Schedule() { + @Override + public Class<? extends Annotation> annotationType() { + return Schedule.class; + } + + @Override + public String cron() { + return cron; + } + + @Override + public Month[] months() { + return new Month[0]; + } + + @Override + public int[] daysOfMonth() { + return new int[0]; + } + + @Override + public DayOfWeek[] daysOfWeek() { + return new DayOfWeek[0]; + } + + @Override + public int[] hours() { + return new int[0]; + } + + @Override + public int[] minutes() { + return new int[0]; + } + + @Override + public int[] seconds() { + return new int[0]; + } + + @Override + public long skipIfLateBy() { + return skipIfLateBy; + } + + @Override + public String zone() { + return zone; + } + }; + } + + private static Schedule scheduleWithFields(final Month[] months, final int[] daysOfMonth, + final DayOfWeek[] daysOfWeek, final int[] hours, + final int[] minutes, final int[] seconds, + final String zone, final long skipIfLateBy) { + return new Schedule() { + @Override + public Class<? extends Annotation> annotationType() { + return Schedule.class; + } + + @Override + public String cron() { + return ""; + } + + @Override + public Month[] months() { + return months; + } + + @Override + public int[] daysOfMonth() { + return daysOfMonth; + } + + @Override + public DayOfWeek[] daysOfWeek() { + return daysOfWeek; + } + + @Override + public int[] hours() { + return hours; + } + + @Override + public int[] minutes() { + return minutes; + } + + @Override + public int[] seconds() { + return seconds; + } + + @Override + public long skipIfLateBy() { + return skipIfLateBy; + } + + @Override + public String zone() { + return zone; + } + }; + } +}
