This is an automated email from the ASF dual-hosted git repository. ilgrosso pushed a commit to branch 4_0_X in repository https://gitbox.apache.org/repos/asf/syncope.git
The following commit(s) were added to refs/heads/4_0_X by this push: new 72ec415b1c VirtualThreadPoolTaskExecutor improvements 72ec415b1c is described below commit 72ec415b1c38fb46a91e456d60ed9446930050bf Author: Francesco Chicchiriccò <ilgro...@apache.org> AuthorDate: Thu Jul 3 15:47:32 2025 +0200 VirtualThreadPoolTaskExecutor improvements --- .../core/rest/cxf/IdRepoRESTCXFContext.java | 5 +- .../core/rest/cxf/service/SyncopeServiceImpl.java | 6 +- .../provisioning/java/ProvisioningContext.java | 8 +-- .../provisioning/java/job/MacroJobDelegate.java | 4 +- .../PriorityPropagationTaskExecutor.java | 6 +- .../spring/task/VirtualThreadPoolTaskExecutor.java | 66 ++++++++++++++++++++-- 6 files changed, 75 insertions(+), 20 deletions(-) diff --git a/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/IdRepoRESTCXFContext.java b/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/IdRepoRESTCXFContext.java index 530becbf10..231a64f2a4 100644 --- a/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/IdRepoRESTCXFContext.java +++ b/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/IdRepoRESTCXFContext.java @@ -136,6 +136,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.core.env.Environment; +import org.springframework.core.task.AsyncTaskExecutor; @PropertySource("classpath:errorMessages.properties") @EnableConfigurationProperties(RESTProperties.class) @@ -145,7 +146,7 @@ public class IdRepoRESTCXFContext { private static final Logger LOG = LoggerFactory.getLogger(IdRepoRESTCXFContext.class); @Bean - public VirtualThreadPoolTaskExecutor batchExecutor(final RESTProperties props) { + public AsyncTaskExecutor batchExecutor(final RESTProperties props) { VirtualThreadPoolTaskExecutor executor = new VirtualThreadPoolTaskExecutor(); executor.setPoolSize(props.getBatchExecutor().getPoolSize()); executor.setAwaitTerminationSeconds(props.getBatchExecutor().getAwaitTerminationSeconds()); @@ -463,7 +464,7 @@ public class IdRepoRESTCXFContext { final Bus bus, final SyncopeLogic syncopeLogic, @Qualifier("batchExecutor") - final VirtualThreadPoolTaskExecutor batchExecutor, + final AsyncTaskExecutor batchExecutor, final BatchDAO batchDAO, final EntityFactory entityFactory) { diff --git a/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/service/SyncopeServiceImpl.java b/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/service/SyncopeServiceImpl.java index 1a3d7ff551..785c0a10fd 100644 --- a/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/service/SyncopeServiceImpl.java +++ b/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/service/SyncopeServiceImpl.java @@ -51,7 +51,7 @@ import org.apache.syncope.core.persistence.api.entity.Batch; import org.apache.syncope.core.persistence.api.entity.EntityFactory; import org.apache.syncope.core.rest.cxf.batch.BatchProcess; import org.apache.syncope.core.spring.security.AuthContextUtils; -import org.apache.syncope.core.spring.task.VirtualThreadPoolTaskExecutor; +import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Sort; @@ -61,7 +61,7 @@ public class SyncopeServiceImpl extends AbstractService implements SyncopeServic protected final SyncopeLogic logic; - protected final VirtualThreadPoolTaskExecutor batchExecutor; + protected final AsyncTaskExecutor batchExecutor; protected final Bus bus; @@ -71,7 +71,7 @@ public class SyncopeServiceImpl extends AbstractService implements SyncopeServic public SyncopeServiceImpl( final SyncopeLogic logic, - final VirtualThreadPoolTaskExecutor batchExecutor, + final AsyncTaskExecutor batchExecutor, final Bus bus, final BatchDAO batchDAO, final EntityFactory entityFactory) { diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ProvisioningContext.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ProvisioningContext.java index 47b0a67cd8..88a3d2fcad 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ProvisioningContext.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ProvisioningContext.java @@ -195,7 +195,7 @@ public class ProvisioningContext { */ @Bean @Primary - public VirtualThreadPoolTaskExecutor asyncConnectorFacadeExecutor(final ProvisioningProperties props) { + public AsyncTaskExecutor asyncConnectorFacadeExecutor(final ProvisioningProperties props) { VirtualThreadPoolTaskExecutor executor = new VirtualThreadPoolTaskExecutor(); executor.setPoolSize(props.getAsyncConnectorFacadeExecutor().getPoolSize()); executor.setAwaitTerminationSeconds(props.getAsyncConnectorFacadeExecutor().getAwaitTerminationSeconds()); @@ -208,7 +208,7 @@ public class ProvisioningContext { @Bean public AsyncConfigurer asyncConfigurer( @Qualifier("asyncConnectorFacadeExecutor") - final VirtualThreadPoolTaskExecutor asyncConnectorFacadeExecutor) { + final AsyncTaskExecutor asyncConnectorFacadeExecutor) { return new AsyncConfigurer() { @@ -226,7 +226,7 @@ public class ProvisioningContext { * @return executor thread pool task executor */ @Bean - public VirtualThreadPoolTaskExecutor propagationTaskExecutorAsyncExecutor(final ProvisioningProperties props) { + public AsyncTaskExecutor propagationTaskExecutorAsyncExecutor(final ProvisioningProperties props) { VirtualThreadPoolTaskExecutor executor = new VirtualThreadPoolTaskExecutor(); executor.setPoolSize(props.getPropagationTaskExecutorAsyncExecutor().getPoolSize()); executor.setWaitForTasksToCompleteOnShutdown(true); @@ -459,7 +459,7 @@ public class ProvisioningContext { @Bean public PropagationTaskExecutor propagationTaskExecutor( @Qualifier("propagationTaskExecutorAsyncExecutor") - final VirtualThreadPoolTaskExecutor propagationTaskExecutorAsyncExecutor, + final AsyncTaskExecutor propagationTaskExecutorAsyncExecutor, final TaskUtilsFactory taskUtilsFactory, final AnyUtilsFactory anyUtilsFactory, final ConnectorManager connectorManager, diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/MacroJobDelegate.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/MacroJobDelegate.java index a2fd5aa0df..e0ece47ee7 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/MacroJobDelegate.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/MacroJobDelegate.java @@ -57,9 +57,9 @@ import org.apache.syncope.core.provisioning.api.macro.Command; import org.apache.syncope.core.provisioning.api.macro.MacroActions; import org.apache.syncope.core.provisioning.api.serialization.POJOHelper; import org.apache.syncope.core.spring.implementation.ImplementationManager; -import org.apache.syncope.core.spring.task.VirtualThreadPoolTaskExecutor; import org.springframework.aop.support.AopUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.security.concurrent.DelegatingSecurityContextCallable; import org.springframework.util.ReflectionUtils; @@ -74,7 +74,7 @@ public class MacroJobDelegate extends AbstractSchedTaskJobDelegate<MacroTask> { protected Validator validator; @Resource(name = "batchExecutor") - protected VirtualThreadPoolTaskExecutor taskExecutor; + protected AsyncTaskExecutor taskExecutor; protected final Map<String, MacroActions> perContextActions = new ConcurrentHashMap<>(); diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java index f4bfc86950..94ea5cdbcf 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java @@ -49,8 +49,8 @@ import org.apache.syncope.core.provisioning.api.propagation.PropagationTaskInfo; import org.apache.syncope.core.provisioning.java.pushpull.OutboundMatcher; import org.apache.syncope.core.provisioning.java.utils.ConnObjectUtils; import org.apache.syncope.core.spring.security.AuthContextUtils; -import org.apache.syncope.core.spring.task.VirtualThreadPoolTaskExecutor; import org.springframework.context.ApplicationEventPublisher; +import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.security.core.GrantedAuthority; import org.springframework.security.core.context.SecurityContextHolder; @@ -63,7 +63,7 @@ import org.springframework.security.core.context.SecurityContextHolder; */ public class PriorityPropagationTaskExecutor extends AbstractPropagationTaskExecutor { - protected final VirtualThreadPoolTaskExecutor taskExecutor; + protected final AsyncTaskExecutor taskExecutor; public PriorityPropagationTaskExecutor( final ConnectorManager connectorManager, @@ -79,7 +79,7 @@ public class PriorityPropagationTaskExecutor extends AbstractPropagationTaskExec final OutboundMatcher outboundMatcher, final PlainAttrValidationManager validator, final ApplicationEventPublisher publisher, - final VirtualThreadPoolTaskExecutor taskExecutor) { + final AsyncTaskExecutor taskExecutor) { super(connectorManager, connObjectUtils, diff --git a/core/spring/src/main/java/org/apache/syncope/core/spring/task/VirtualThreadPoolTaskExecutor.java b/core/spring/src/main/java/org/apache/syncope/core/spring/task/VirtualThreadPoolTaskExecutor.java index 6bcf57d0a8..fea02c8f2b 100644 --- a/core/spring/src/main/java/org/apache/syncope/core/spring/task/VirtualThreadPoolTaskExecutor.java +++ b/core/spring/src/main/java/org/apache/syncope/core/spring/task/VirtualThreadPoolTaskExecutor.java @@ -18,16 +18,18 @@ */ package org.apache.syncope.core.spring.task; +import java.util.List; import java.util.Optional; +import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.core.task.TaskDecorator; -import org.springframework.core.task.support.ExecutorServiceAdapter; import org.springframework.scheduling.SchedulingTaskExecutor; import org.springframework.scheduling.concurrent.ExecutorConfigurationSupport; @@ -39,6 +41,8 @@ public class VirtualThreadPoolTaskExecutor private int poolSize = -1; + private long taskTerminationTimeout; + private TaskDecorator taskDecorator; private SimpleAsyncTaskExecutor executor; @@ -53,10 +57,24 @@ public class VirtualThreadPoolTaskExecutor } /** - * @return the maximum number of managed threads + * Specify a timeout (in milliseconds) for task termination when closing + * this executor. The default is 0, not waiting for task termination at all. + * <p> + * Note that a concrete >0 timeout specified here will lead to the + * wrapping of every submitted task into a task-tracking runnable which + * involves considerable overhead in case of a high number of tasks. + * However, for a modest level of submissions with longer-running + * tasks, this is feasible in order to arrive at a graceful shutdown. + * <p> + * Note that {@code SimpleAsyncTaskExecutor} does not participate in + * a coordinated lifecycle stop but rather just awaits task termination + * on {@link #close()}. + * + * @param taskTerminationTimeout the timeout in milliseconds + * @see SimpleAsyncTaskExecutor#close */ - public int getPoolSize() { - return poolSize; + public void setTaskTerminationTimeout(final long taskTerminationTimeout) { + this.taskTerminationTimeout = taskTerminationTimeout; } /** @@ -94,10 +112,46 @@ public class VirtualThreadPoolTaskExecutor executor = new SimpleAsyncTaskExecutor(getThreadNamePrefix()); executor.setVirtualThreads(true); + executor.setDaemon(true); executor.setConcurrencyLimit(poolSize); + if (taskTerminationTimeout >= 0) { + executor.setTaskTerminationTimeout(taskTerminationTimeout); + } Optional.ofNullable(taskDecorator).ifPresent(executor::setTaskDecorator); - return new ExecutorServiceAdapter(executor); + return new AbstractExecutorService() { + + @Override + public void execute(final Runnable task) { + executor.execute(task); + } + + @Override + public void shutdown() { + executor.close(); + } + + @Override + public List<Runnable> shutdownNow() { + executor.close(); + return List.of(); + } + + @Override + public boolean isShutdown() { + return !executor.isActive(); + } + + @Override + public boolean isTerminated() { + return !executor.isActive(); + } + + @Override + public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException { + return !executor.isActive(); + } + }; } @Override @@ -117,6 +171,6 @@ public class VirtualThreadPoolTaskExecutor @Override public void shutdown() { - // manual shutdown is not supported + executor.close(); } }